1use std::fs::{File, OpenOptions};
41use std::io::{Read, Seek, SeekFrom, Write};
42use std::path::{Path, PathBuf};
43use std::sync::atomic::{AtomicU64, Ordering};
44
45#[cfg(not(target_arch = "wasm32"))]
46use std::os::unix::fs::OpenOptionsExt as _;
47
48use crate::align::{AlignedBuf, DEFAULT_ALIGNMENT, is_aligned};
49use crate::error::{Result, WalError};
50use crate::record::{HEADER_SIZE, RecordHeader, WAL_MAGIC, WalRecord};
51
52const DWB_CAPACITY: usize = 64;
60
61const DWB_SLOT_PAYLOAD_MAX: usize = 64 * 1024;
63
64const DWB_SLOT_RAW: usize = 4 + HEADER_SIZE + DWB_SLOT_PAYLOAD_MAX;
66
67const DWB_SLOT_STRIDE: usize = round_up_const(DWB_SLOT_RAW, DEFAULT_ALIGNMENT);
71
72const DWB_HEADER_STRIDE: usize = DEFAULT_ALIGNMENT;
76const DWB_HEADER_FIELDS: usize = 12;
77const DWB_MAGIC: u32 = 0x4457_4246; static DWB_BYTES_WRITTEN_TOTAL: AtomicU64 = AtomicU64::new(0);
83
84pub fn wal_dwb_bytes_written_total() -> u64 {
86 DWB_BYTES_WRITTEN_TOTAL.load(Ordering::Relaxed)
87}
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
91pub enum DwbMode {
92 Off,
95 Buffered,
98 Direct,
101}
102
103impl DwbMode {
104 pub fn default_for_parent(parent_uses_direct_io: bool) -> Self {
108 if parent_uses_direct_io {
109 Self::Direct
110 } else {
111 Self::Buffered
112 }
113 }
114}
115
116const fn round_up_const(value: usize, align: usize) -> usize {
117 (value + align - 1) & !(align - 1)
118}
119
120pub const fn slot_stride() -> usize {
123 DWB_SLOT_STRIDE
124}
125
126fn slot_offset(idx: u32) -> u64 {
128 DWB_HEADER_STRIDE as u64 + (idx as u64 % DWB_CAPACITY as u64) * DWB_SLOT_STRIDE as u64
129}
130
131pub struct DoubleWriteBuffer {
133 file: File,
134 path: PathBuf,
135 mode: DwbMode,
136 write_pos: u32,
138 count: u32,
140 dirty: bool,
142 slot_buf: Option<AlignedBuf>,
145 header_buf: Option<AlignedBuf>,
147}
148
149impl std::fmt::Debug for DoubleWriteBuffer {
150 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151 f.debug_struct("DoubleWriteBuffer")
152 .field("path", &self.path)
153 .field("mode", &self.mode)
154 .field("write_pos", &self.write_pos)
155 .field("count", &self.count)
156 .finish()
157 }
158}
159
160impl DoubleWriteBuffer {
161 pub fn open(path: &Path, mode: DwbMode) -> Result<Self> {
166 if mode == DwbMode::Off {
167 return Err(WalError::DwbOffNotOpenable);
168 }
169
170 let mut opts = OpenOptions::new();
171 opts.read(true).write(true).create(true).truncate(false);
172 #[cfg(not(target_arch = "wasm32"))]
173 if mode == DwbMode::Direct {
174 opts.custom_flags(libc::O_DIRECT);
175 }
176
177 let file = opts.open(path).map_err(|e| {
178 tracing::warn!(path = %path.display(), error = %e, mode = ?mode, "failed to open double-write buffer");
179 WalError::Io(e)
180 })?;
181
182 let (slot_buf, header_buf) = if mode == DwbMode::Direct {
183 (
184 Some(AlignedBuf::new(DWB_SLOT_STRIDE, DEFAULT_ALIGNMENT)?),
185 Some(AlignedBuf::new(DWB_HEADER_STRIDE, DEFAULT_ALIGNMENT)?),
186 )
187 } else {
188 (None, None)
189 };
190
191 let mut dwb = Self {
192 file,
193 path: path.to_path_buf(),
194 mode,
195 write_pos: 0,
196 count: 0,
197 dirty: false,
198 slot_buf,
199 header_buf,
200 };
201
202 let file_len = dwb.file.metadata().map(|m| m.len()).unwrap_or(0);
204 if file_len >= DWB_HEADER_STRIDE as u64 {
205 let mut block = vec![0u8; DWB_HEADER_STRIDE];
206 dwb.file.seek(SeekFrom::Start(0)).map_err(WalError::Io)?;
207 if dwb.file.read_exact(&mut block).is_ok() {
208 let mut arr4 = [0u8; 4];
209 arr4.copy_from_slice(&block[0..4]);
210 let magic = u32::from_le_bytes(arr4);
211 if magic == DWB_MAGIC {
212 arr4.copy_from_slice(&block[4..8]);
213 dwb.count = u32::from_le_bytes(arr4);
214 arr4.copy_from_slice(&block[8..12]);
215 dwb.write_pos = u32::from_le_bytes(arr4);
216 }
217 }
218 }
219
220 Ok(dwb)
221 }
222
223 pub fn mode(&self) -> DwbMode {
225 self.mode
226 }
227
228 pub fn write_record(&mut self, record: &WalRecord) -> Result<()> {
234 self.write_record_deferred(record)?;
235 self.flush()
236 }
237
238 pub fn write_record_deferred(&mut self, record: &WalRecord) -> Result<()> {
246 let total_size = HEADER_SIZE + record.payload.len();
247
248 if total_size > DWB_SLOT_PAYLOAD_MAX {
251 return Ok(()); }
253
254 let header_bytes = record.header.to_bytes();
255 let offset = slot_offset(self.write_pos);
256
257 match self.mode {
258 DwbMode::Off => unreachable!("Off never opens a DoubleWriteBuffer"),
259 DwbMode::Buffered => {
260 self.file
261 .seek(SeekFrom::Start(offset))
262 .map_err(WalError::Io)?;
263 self.file
264 .write_all(&(total_size as u32).to_le_bytes())
265 .map_err(WalError::Io)?;
266 self.file.write_all(&header_bytes).map_err(WalError::Io)?;
267 self.file.write_all(&record.payload).map_err(WalError::Io)?;
268 DWB_BYTES_WRITTEN_TOTAL.fetch_add(
269 (4 + header_bytes.len() + record.payload.len()) as u64,
270 Ordering::Relaxed,
271 );
272 }
273 DwbMode::Direct => {
274 let buf = self
275 .slot_buf
276 .as_mut()
277 .expect("slot_buf present in Direct mode");
278 buf.clear();
279 buf.write(&(total_size as u32).to_le_bytes());
280 buf.write(&header_bytes);
281 buf.write(&record.payload);
282 zero_tail(buf);
285 let slice = full_capacity_slice(buf);
286 debug_assert_eq!(slice.len(), DWB_SLOT_STRIDE);
287 debug_assert!(is_aligned(offset as usize, DEFAULT_ALIGNMENT));
288 pwrite_all(&self.file, slice, offset)?;
289 DWB_BYTES_WRITTEN_TOTAL.fetch_add(slice.len() as u64, Ordering::Relaxed);
290 }
291 }
292
293 self.write_pos = self.write_pos.wrapping_add(1);
294 self.count = self.count.saturating_add(1).min(DWB_CAPACITY as u32);
295 self.dirty = true;
296
297 Ok(())
298 }
299
300 pub fn flush(&mut self) -> Result<()> {
306 if !self.dirty {
307 return Ok(());
308 }
309
310 let mut header = [0u8; DWB_HEADER_FIELDS];
311 header[0..4].copy_from_slice(&DWB_MAGIC.to_le_bytes());
312 header[4..8].copy_from_slice(&self.count.to_le_bytes());
313 header[8..12].copy_from_slice(&self.write_pos.to_le_bytes());
314
315 match self.mode {
316 DwbMode::Off => unreachable!("invariant: flush() is gated on mode != Off by caller"),
317 DwbMode::Buffered => {
318 self.file.seek(SeekFrom::Start(0)).map_err(WalError::Io)?;
319 self.file.write_all(&header).map_err(WalError::Io)?;
320 DWB_BYTES_WRITTEN_TOTAL.fetch_add(header.len() as u64, Ordering::Relaxed);
321 }
322 DwbMode::Direct => {
323 let buf = self
324 .header_buf
325 .as_mut()
326 .expect("header_buf present in Direct mode");
327 buf.clear();
328 buf.write(&header);
329 zero_tail(buf);
330 let slice = full_capacity_slice(buf);
331 debug_assert_eq!(slice.len(), DWB_HEADER_STRIDE);
332 pwrite_all(&self.file, slice, 0)?;
333 DWB_BYTES_WRITTEN_TOTAL.fetch_add(slice.len() as u64, Ordering::Relaxed);
334 }
335 }
336
337 self.file.sync_all().map_err(WalError::Io)?;
338 self.dirty = false;
339
340 Ok(())
341 }
342
343 pub fn path(&self) -> &Path {
345 &self.path
346 }
347
348 pub fn recover_record(&mut self, target_lsn: u64) -> Result<Option<WalRecord>> {
356 #[cfg(target_arch = "wasm32")]
357 {
358 let _ = target_lsn;
359 return Ok(None);
360 }
361
362 #[cfg(not(target_arch = "wasm32"))]
363 {
364 use std::os::unix::io::AsRawFd as _;
365
366 let mut slot = AlignedBuf::new(DWB_SLOT_STRIDE, DEFAULT_ALIGNMENT)?;
369
370 for i in 0..DWB_CAPACITY as u32 {
371 let offset = slot_offset(i);
372 let read = unsafe {
374 libc::pread(
375 self.file.as_raw_fd(),
376 slot.as_mut_ptr() as *mut libc::c_void,
377 DWB_SLOT_STRIDE,
378 offset as libc::off_t,
379 )
380 };
381 if read <= 0 {
382 continue;
383 }
384 let bytes: &[u8] =
386 unsafe { std::slice::from_raw_parts(slot.as_ptr(), read as usize) };
387 if bytes.len() < 4 + HEADER_SIZE {
388 continue;
389 }
390
391 let mut arr4 = [0u8; 4];
392 arr4.copy_from_slice(&bytes[0..4]);
393 let total_size = u32::from_le_bytes(arr4) as usize;
394 if !(HEADER_SIZE..=DWB_SLOT_PAYLOAD_MAX).contains(&total_size)
395 || bytes.len() < 4 + total_size
396 {
397 continue;
398 }
399
400 let mut header_buf = [0u8; HEADER_SIZE];
401 header_buf.copy_from_slice(&bytes[4..4 + HEADER_SIZE]);
402 let header = RecordHeader::from_bytes(&header_buf);
403 if header.magic != WAL_MAGIC || header.lsn != target_lsn {
404 continue;
405 }
406
407 let payload_len = total_size - HEADER_SIZE;
408 let payload = bytes[4 + HEADER_SIZE..4 + HEADER_SIZE + payload_len].to_vec();
409 let record = WalRecord { header, payload };
410 if record.verify_checksum().is_ok() {
411 return Ok(Some(record));
412 }
413 }
414
415 Ok(None)
416 }
417 }
418}
419
420fn zero_tail(buf: &mut AlignedBuf) {
423 let written = buf.len();
424 let cap = buf.capacity();
425 if written < cap {
426 unsafe {
429 std::ptr::write_bytes(buf.as_mut_ptr().add(written), 0, cap - written);
430 }
431 }
432}
433
434fn full_capacity_slice(buf: &AlignedBuf) -> &[u8] {
437 unsafe { std::slice::from_raw_parts(buf.as_ptr(), buf.capacity()) }
440}
441
442fn pwrite_all(file: &File, data: &[u8], offset: u64) -> Result<()> {
444 #[cfg(not(target_arch = "wasm32"))]
445 {
446 use std::os::unix::io::AsRawFd as _;
447 let fd = file.as_raw_fd();
448 let mut remaining = data;
449 let mut write_offset = offset;
450 while !remaining.is_empty() {
451 let n = unsafe {
452 libc::pwrite(
453 fd,
454 remaining.as_ptr() as *const libc::c_void,
455 remaining.len(),
456 write_offset as libc::off_t,
457 )
458 };
459 if n < 0 {
460 return Err(WalError::Io(std::io::Error::last_os_error()));
461 }
462 let n = n as usize;
463 remaining = &remaining[n..];
464 write_offset += n as u64;
465 }
466 Ok(())
467 }
468 #[cfg(target_arch = "wasm32")]
469 {
470 let _ = (file, data, offset);
471 Err(WalError::Unsupported {
472 detail: "O_DIRECT pwrite not available on wasm32",
473 })
474 }
475}
476
477#[cfg(test)]
478mod tests {
479 use super::*;
480 use crate::record::RecordType;
481
482 fn open_buffered(path: &Path) -> DoubleWriteBuffer {
483 DoubleWriteBuffer::open(path, DwbMode::Buffered).unwrap()
484 }
485
486 #[test]
487 fn write_and_recover() {
488 let dir = tempfile::tempdir().unwrap();
489 let dwb_path = dir.path().join("test.dwb");
490
491 let mut dwb = open_buffered(&dwb_path);
492
493 let record = WalRecord::new(
494 RecordType::Put as u32,
495 42,
496 1,
497 0,
498 0,
499 b"hello double-write".to_vec(),
500 None,
501 None,
502 )
503 .unwrap();
504
505 dwb.write_record(&record).unwrap();
506
507 let recovered = dwb.recover_record(42).unwrap();
509 assert!(recovered.is_some());
510 let rec = recovered.unwrap();
511 assert_eq!(rec.header.lsn, 42);
512 assert_eq!(rec.payload, b"hello double-write");
513 }
514
515 #[test]
516 fn recover_nonexistent_returns_none() {
517 let dir = tempfile::tempdir().unwrap();
518 let dwb_path = dir.path().join("test2.dwb");
519
520 let mut dwb = open_buffered(&dwb_path);
521 let result = dwb.recover_record(999).unwrap();
522 assert!(result.is_none());
523 }
524
525 #[test]
526 fn survives_reopen() {
527 let dir = tempfile::tempdir().unwrap();
528 let dwb_path = dir.path().join("reopen.dwb");
529
530 {
531 let mut dwb = open_buffered(&dwb_path);
532 let record = WalRecord::new(
533 RecordType::Put as u32,
534 7,
535 1,
536 0,
537 0,
538 b"durable".to_vec(),
539 None,
540 None,
541 )
542 .unwrap();
543 dwb.write_record(&record).unwrap();
544 }
545
546 let mut dwb = open_buffered(&dwb_path);
547 let recovered = dwb.recover_record(7).unwrap();
548 assert!(recovered.is_some());
549 assert_eq!(recovered.unwrap().payload, b"durable");
550 }
551
552 #[test]
553 fn batch_deferred_writes_and_flush() {
554 let dir = tempfile::tempdir().unwrap();
555 let dwb_path = dir.path().join("batch.dwb");
556
557 let mut dwb = open_buffered(&dwb_path);
558
559 for lsn in 1..=5u64 {
560 let record = WalRecord::new(
561 RecordType::Put as u32,
562 lsn,
563 1,
564 0,
565 0,
566 format!("batch-{lsn}").into_bytes(),
567 None,
568 None,
569 )
570 .unwrap();
571 dwb.write_record_deferred(&record).unwrap();
572 }
573
574 assert!(dwb.dirty);
575 dwb.flush().unwrap();
576 assert!(!dwb.dirty);
577
578 for lsn in 1..=5u64 {
579 let recovered = dwb.recover_record(lsn).unwrap();
580 assert!(recovered.is_some(), "LSN {lsn} should be recoverable");
581 assert_eq!(
582 recovered.unwrap().payload,
583 format!("batch-{lsn}").into_bytes()
584 );
585 }
586 }
587
588 #[test]
589 fn flush_is_idempotent() {
590 let dir = tempfile::tempdir().unwrap();
591 let dwb_path = dir.path().join("idem.dwb");
592
593 let mut dwb = open_buffered(&dwb_path);
594
595 dwb.flush().unwrap();
596 assert!(!dwb.dirty);
597
598 let record = WalRecord::new(
599 RecordType::Put as u32,
600 1,
601 1,
602 0,
603 0,
604 b"data".to_vec(),
605 None,
606 None,
607 )
608 .unwrap();
609 dwb.write_record_deferred(&record).unwrap();
610 dwb.flush().unwrap();
611 dwb.flush().unwrap();
612 assert!(!dwb.dirty);
613 }
614
615 #[test]
616 fn slot_stride_is_o_direct_aligned() {
617 assert!(
623 is_aligned(DWB_SLOT_STRIDE, DEFAULT_ALIGNMENT),
624 "DWB slot stride {DWB_SLOT_STRIDE} bytes is not a multiple of {DEFAULT_ALIGNMENT}"
625 );
626 assert!(is_aligned(DWB_HEADER_STRIDE, DEFAULT_ALIGNMENT));
627 for i in 0..DWB_CAPACITY as u32 {
628 assert!(is_aligned(slot_offset(i) as usize, DEFAULT_ALIGNMENT));
629 }
630 }
631
632 #[test]
633 fn recover_after_wraparound() {
634 let dir = tempfile::tempdir().unwrap();
635 let dwb_path = dir.path().join("wrap.dwb");
636
637 let mut dwb = open_buffered(&dwb_path);
638
639 let total = DWB_CAPACITY as u64 + 5;
640 for lsn in 1..=total {
641 let record = WalRecord::new(
642 RecordType::Put as u32,
643 lsn,
644 1,
645 0,
646 0,
647 format!("wrap-{lsn}").into_bytes(),
648 None,
649 None,
650 )
651 .unwrap();
652 dwb.write_record_deferred(&record).unwrap();
653 }
654 dwb.flush().unwrap();
655
656 for lsn in (total - 4)..=total {
657 let recovered = dwb.recover_record(lsn).unwrap();
658 assert!(
659 recovered.is_some(),
660 "LSN {lsn} should be recoverable after wrap-around"
661 );
662 assert_eq!(
663 recovered.unwrap().payload,
664 format!("wrap-{lsn}").into_bytes()
665 );
666 }
667
668 for lsn in 1..=5u64 {
669 let recovered = dwb.recover_record(lsn).unwrap();
670 assert!(
671 recovered.is_none(),
672 "LSN {lsn} should have been overwritten by wrap-around"
673 );
674 }
675 }
676
677 #[test]
678 fn bytes_written_counter_increments() {
679 let dir = tempfile::tempdir().unwrap();
680 let dwb_path = dir.path().join("counter.dwb");
681 let before = wal_dwb_bytes_written_total();
682
683 let mut dwb = open_buffered(&dwb_path);
684 let rec = WalRecord::new(
685 RecordType::Put as u32,
686 1,
687 1,
688 0,
689 0,
690 b"counted".to_vec(),
691 None,
692 None,
693 )
694 .unwrap();
695 dwb.write_record(&rec).unwrap();
696
697 assert!(wal_dwb_bytes_written_total() > before);
698 }
699}