nodedb_wal/
double_write.rs1use std::fs::{File, OpenOptions};
26use std::io::{Read, Seek, SeekFrom, Write};
27use std::path::{Path, PathBuf};
28
29use crate::error::{Result, WalError};
30use crate::record::{HEADER_SIZE, RecordHeader, WAL_MAGIC, WalRecord};
31
32const DWB_CAPACITY: usize = 64;
40
41const DWB_HEADER_SIZE: usize = 12;
43const DWB_MAGIC: u32 = 0x4457_4246; pub struct DoubleWriteBuffer {
47 file: File,
48 path: PathBuf,
49 write_pos: u32,
51 count: u32,
53 dirty: bool,
55}
56
57impl std::fmt::Debug for DoubleWriteBuffer {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 f.debug_struct("DoubleWriteBuffer")
60 .field("path", &self.path)
61 .field("write_pos", &self.write_pos)
62 .field("count", &self.count)
63 .finish()
64 }
65}
66
67impl DoubleWriteBuffer {
68 pub fn open(path: &Path) -> Result<Self> {
70 let file = OpenOptions::new()
71 .read(true)
72 .write(true)
73 .create(true)
74 .truncate(false)
75 .open(path)
76 .map_err(|e| {
77 tracing::warn!(path = %path.display(), error = %e, "failed to open double-write buffer");
78 WalError::Io(e)
79 })?;
80
81 let mut dwb = Self {
82 file,
83 path: path.to_path_buf(),
84 write_pos: 0,
85 count: 0,
86 dirty: false,
87 };
88
89 let file_len = dwb.file.metadata().map(|m| m.len()).unwrap_or(0);
91 if file_len >= DWB_HEADER_SIZE as u64 {
92 let mut header = [0u8; DWB_HEADER_SIZE];
93 dwb.file.seek(SeekFrom::Start(0)).map_err(WalError::Io)?;
94 if dwb.file.read_exact(&mut header).is_ok() {
95 let mut arr4 = [0u8; 4];
96 arr4.copy_from_slice(&header[0..4]);
97 let magic = u32::from_le_bytes(arr4);
98 if magic == DWB_MAGIC {
99 arr4.copy_from_slice(&header[4..8]);
100 dwb.count = u32::from_le_bytes(arr4);
101 arr4.copy_from_slice(&header[8..12]);
102 dwb.write_pos = u32::from_le_bytes(arr4);
103 }
104 }
105 }
106
107 Ok(dwb)
108 }
109
110 pub fn write_record(&mut self, record: &WalRecord) -> Result<()> {
116 self.write_record_deferred(record)?;
117 self.flush()
118 }
119
120 pub fn write_record_deferred(&mut self, record: &WalRecord) -> Result<()> {
127 let record_bytes = record.header.to_bytes();
128 let total_size = HEADER_SIZE + record.payload.len();
129
130 if total_size > 64 * 1024 {
133 return Ok(()); }
135
136 let slot_offset = DWB_HEADER_SIZE as u64
139 + (self.write_pos as u64 % DWB_CAPACITY as u64) * (4 + HEADER_SIZE as u64 + 64 * 1024);
140
141 self.file
142 .seek(SeekFrom::Start(slot_offset))
143 .map_err(WalError::Io)?;
144 self.file
145 .write_all(&(total_size as u32).to_le_bytes())
146 .map_err(WalError::Io)?;
147 self.file.write_all(&record_bytes).map_err(WalError::Io)?;
148 self.file.write_all(&record.payload).map_err(WalError::Io)?;
149
150 self.write_pos = self.write_pos.wrapping_add(1);
152 self.count = self.count.saturating_add(1).min(DWB_CAPACITY as u32);
153 self.dirty = true;
154
155 Ok(())
156 }
157
158 pub fn flush(&mut self) -> Result<()> {
164 if !self.dirty {
165 return Ok(());
166 }
167
168 let mut header = [0u8; DWB_HEADER_SIZE];
171 header[0..4].copy_from_slice(&DWB_MAGIC.to_le_bytes());
172 header[4..8].copy_from_slice(&self.count.to_le_bytes());
173 header[8..12].copy_from_slice(&self.write_pos.to_le_bytes());
174
175 self.file.seek(SeekFrom::Start(0)).map_err(WalError::Io)?;
176 self.file.write_all(&header).map_err(WalError::Io)?;
177
178 self.file.sync_all().map_err(WalError::Io)?;
179 self.dirty = false;
180
181 Ok(())
182 }
183
184 pub fn path(&self) -> &Path {
186 &self.path
187 }
188
189 pub fn recover_record(&mut self, target_lsn: u64) -> Result<Option<WalRecord>> {
197 let slot_size = 4 + HEADER_SIZE as u64 + 64 * 1024;
198
199 for i in 0..DWB_CAPACITY {
200 let slot_offset = DWB_HEADER_SIZE as u64 + (i as u64) * slot_size;
201
202 self.file
203 .seek(SeekFrom::Start(slot_offset))
204 .map_err(WalError::Io)?;
205
206 let mut size_buf = [0u8; 4];
207 if self.file.read_exact(&mut size_buf).is_err() {
208 continue;
209 }
210 let total_size = u32::from_le_bytes(size_buf) as usize;
211 if !(HEADER_SIZE..=64 * 1024).contains(&total_size) {
212 continue;
213 }
214
215 let mut header_buf = [0u8; HEADER_SIZE];
216 if self.file.read_exact(&mut header_buf).is_err() {
217 continue;
218 }
219 let header = RecordHeader::from_bytes(&header_buf);
220
221 if header.magic != WAL_MAGIC || header.lsn != target_lsn {
222 continue;
223 }
224
225 let payload_len = total_size - HEADER_SIZE;
226 let mut payload = vec![0u8; payload_len];
227 if self.file.read_exact(&mut payload).is_err() {
228 continue;
229 }
230
231 let record = WalRecord { header, payload };
232 if record.verify_checksum().is_ok() {
233 return Ok(Some(record));
234 }
235 }
236
237 Ok(None)
238 }
239}
240
241#[cfg(test)]
242mod tests {
243 use super::*;
244 use crate::record::RecordType;
245
246 #[test]
247 fn write_and_recover() {
248 let dir = tempfile::tempdir().unwrap();
249 let dwb_path = dir.path().join("test.dwb");
250
251 let mut dwb = DoubleWriteBuffer::open(&dwb_path).unwrap();
252
253 let record = WalRecord::new(
254 RecordType::Put as u16,
255 42,
256 1,
257 0,
258 b"hello double-write".to_vec(),
259 None,
260 )
261 .unwrap();
262
263 dwb.write_record(&record).unwrap();
264
265 let recovered = dwb.recover_record(42).unwrap();
267 assert!(recovered.is_some());
268 let rec = recovered.unwrap();
269 assert_eq!(rec.header.lsn, 42);
270 assert_eq!(rec.payload, b"hello double-write");
271 }
272
273 #[test]
274 fn recover_nonexistent_returns_none() {
275 let dir = tempfile::tempdir().unwrap();
276 let dwb_path = dir.path().join("test2.dwb");
277
278 let mut dwb = DoubleWriteBuffer::open(&dwb_path).unwrap();
279 let result = dwb.recover_record(999).unwrap();
280 assert!(result.is_none());
281 }
282
283 #[test]
284 fn survives_reopen() {
285 let dir = tempfile::tempdir().unwrap();
286 let dwb_path = dir.path().join("reopen.dwb");
287
288 {
289 let mut dwb = DoubleWriteBuffer::open(&dwb_path).unwrap();
290 let record =
291 WalRecord::new(RecordType::Put as u16, 7, 1, 0, b"durable".to_vec(), None).unwrap();
292 dwb.write_record(&record).unwrap();
293 }
294
295 let mut dwb = DoubleWriteBuffer::open(&dwb_path).unwrap();
296 let recovered = dwb.recover_record(7).unwrap();
297 assert!(recovered.is_some());
298 assert_eq!(recovered.unwrap().payload, b"durable");
299 }
300
301 #[test]
302 fn batch_deferred_writes_and_flush() {
303 let dir = tempfile::tempdir().unwrap();
304 let dwb_path = dir.path().join("batch.dwb");
305
306 let mut dwb = DoubleWriteBuffer::open(&dwb_path).unwrap();
307
308 for lsn in 1..=5u64 {
310 let record = WalRecord::new(
311 RecordType::Put as u16,
312 lsn,
313 1,
314 0,
315 format!("batch-{lsn}").into_bytes(),
316 None,
317 )
318 .unwrap();
319 dwb.write_record_deferred(&record).unwrap();
320 }
321
322 assert!(dwb.dirty);
323
324 dwb.flush().unwrap();
326 assert!(!dwb.dirty);
327
328 for lsn in 1..=5u64 {
330 let recovered = dwb.recover_record(lsn).unwrap();
331 assert!(recovered.is_some(), "LSN {lsn} should be recoverable");
332 assert_eq!(
333 recovered.unwrap().payload,
334 format!("batch-{lsn}").into_bytes()
335 );
336 }
337 }
338
339 #[test]
340 fn flush_is_idempotent() {
341 let dir = tempfile::tempdir().unwrap();
342 let dwb_path = dir.path().join("idem.dwb");
343
344 let mut dwb = DoubleWriteBuffer::open(&dwb_path).unwrap();
345
346 dwb.flush().unwrap();
348 assert!(!dwb.dirty);
349
350 let record =
352 WalRecord::new(RecordType::Put as u16, 1, 1, 0, b"data".to_vec(), None).unwrap();
353 dwb.write_record_deferred(&record).unwrap();
354 dwb.flush().unwrap();
355 dwb.flush().unwrap(); assert!(!dwb.dirty);
357 }
358
359 #[test]
360 fn recover_after_wraparound() {
361 let dir = tempfile::tempdir().unwrap();
362 let dwb_path = dir.path().join("wrap.dwb");
363
364 let mut dwb = DoubleWriteBuffer::open(&dwb_path).unwrap();
365
366 let total = super::DWB_CAPACITY as u64 + 5;
370 for lsn in 1..=total {
371 let record = WalRecord::new(
372 RecordType::Put as u16,
373 lsn,
374 1,
375 0,
376 format!("wrap-{lsn}").into_bytes(),
377 None,
378 )
379 .unwrap();
380 dwb.write_record_deferred(&record).unwrap();
381 }
382 dwb.flush().unwrap();
383
384 for lsn in (total - 4)..=total {
386 let recovered = dwb.recover_record(lsn).unwrap();
387 assert!(
388 recovered.is_some(),
389 "LSN {lsn} should be recoverable after wrap-around"
390 );
391 assert_eq!(
392 recovered.unwrap().payload,
393 format!("wrap-{lsn}").into_bytes()
394 );
395 }
396
397 for lsn in 1..=5u64 {
400 let recovered = dwb.recover_record(lsn).unwrap();
401 assert!(
402 recovered.is_none(),
403 "LSN {lsn} should have been overwritten by wrap-around"
404 );
405 }
406 }
407}