Skip to main content

nodedb_wal/
writer.rs

1//! WAL writer with O_DIRECT and group commit.
2//!
3//! The writer accumulates records into an aligned buffer and flushes to disk
4//! when the buffer is full or when an explicit sync is requested.
5//!
6//! ## I/O path
7//!
8//! 1. Caller creates a `WalRecord` and submits it to the writer.
9//! 2. Writer serializes the record into the aligned write buffer.
10//! 3. When the buffer is full or `sync()` is called, the buffer is written
11//!    to the WAL file via `O_DIRECT` + `fsync`.
12//! 4. Group commit: multiple concurrent writers can submit records between
13//!    syncs, and they all share a single `fsync` call.
14//!
15//! ## Future: io_uring
16//!
17//! The current implementation uses standard `pwrite` + `fsync` with O_DIRECT.
18//! Phase 1 validates correctness and determinism. io_uring submission will be
19//! added once the bridge crate provides the TPC event loop integration.
20
21use std::fs::{File, OpenOptions};
22use std::os::unix::fs::OpenOptionsExt;
23use std::path::Path;
24use std::sync::atomic::{AtomicU64, Ordering};
25
26use crate::align::{AlignedBuf, DEFAULT_ALIGNMENT};
27use crate::double_write::DwbMode;
28use crate::error::{Result, WalError};
29use crate::record::{HEADER_SIZE, WalRecord};
30
31/// Default write buffer size: 2 MiB.
32///
33/// This is the batch size for group commit. Records accumulate here until
34/// the buffer is full or `sync()` is called.
35///
36/// Matches `WalTuning::write_buffer_size` default. Override via
37/// `WalWriterConfig::write_buffer_size` at construction time.
38pub const DEFAULT_WRITE_BUFFER_SIZE: usize = 2 * 1024 * 1024;
39
40/// Configuration for the WAL writer.
41#[derive(Debug, Clone)]
42pub struct WalWriterConfig {
43    /// Size of the aligned write buffer (rounded up to alignment).
44    pub write_buffer_size: usize,
45
46    /// O_DIRECT alignment (typically 4096 for NVMe).
47    pub alignment: usize,
48
49    /// Whether to use O_DIRECT. Set to `false` for testing on filesystems
50    /// that don't support it (e.g., tmpfs).
51    pub use_direct_io: bool,
52
53    /// Double-write buffer I/O mode. `None` means "mirror the parent" —
54    /// `Direct` when `use_direct_io` is true, `Buffered` otherwise.
55    /// `Some(DwbMode::Off)` disables the DWB entirely.
56    pub dwb_mode: Option<DwbMode>,
57}
58
59impl Default for WalWriterConfig {
60    fn default() -> Self {
61        Self {
62            write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
63            alignment: DEFAULT_ALIGNMENT,
64            use_direct_io: true,
65            dwb_mode: None,
66        }
67    }
68}
69
70fn resolve_dwb_mode(config: &WalWriterConfig) -> DwbMode {
71    config
72        .dwb_mode
73        .unwrap_or_else(|| DwbMode::default_for_parent(config.use_direct_io))
74}
75
76fn open_dwb_for(
77    config: &WalWriterConfig,
78    path: &Path,
79) -> Option<crate::double_write::DoubleWriteBuffer> {
80    let mode = resolve_dwb_mode(config);
81    if mode == DwbMode::Off {
82        return None;
83    }
84    let dwb_path = path.with_extension("dwb");
85    match crate::double_write::DoubleWriteBuffer::open(&dwb_path, mode) {
86        Ok(d) => Some(d),
87        Err(e) => {
88            tracing::warn!(
89                path = %dwb_path.display(),
90                error = %e,
91                mode = ?mode,
92                "failed to open DWB — torn-write protection disabled for this writer"
93            );
94            None
95        }
96    }
97}
98
99/// Append-only WAL writer.
100pub struct WalWriter {
101    /// The WAL file handle (opened with O_DIRECT if configured).
102    file: File,
103
104    /// Aligned write buffer for batching records before flush.
105    buffer: AlignedBuf,
106
107    /// Current file write offset (always aligned).
108    file_offset: u64,
109
110    /// Next LSN to assign.
111    next_lsn: AtomicU64,
112
113    /// Whether the writer has been sealed (no more writes accepted).
114    sealed: bool,
115
116    /// Configuration.
117    config: WalWriterConfig,
118
119    /// Optional key ring for payload encryption (supports key rotation).
120    encryption_ring: Option<crate::crypto::KeyRing>,
121
122    /// Optional double-write buffer for torn write protection.
123    /// Records are written here before the WAL for crash recovery.
124    double_write: Option<crate::double_write::DoubleWriteBuffer>,
125}
126
127impl WalWriter {
128    /// Open or create a WAL file at the given path.
129    pub fn open(path: &Path, config: WalWriterConfig) -> Result<Self> {
130        let mut opts = OpenOptions::new();
131        opts.create(true).write(true).append(false);
132
133        if config.use_direct_io {
134            // O_DIRECT: bypass page cache.
135            opts.custom_flags(libc::O_DIRECT);
136        }
137
138        let file = opts.open(path)?;
139
140        let buffer = AlignedBuf::new(config.write_buffer_size, config.alignment)?;
141
142        // Scan existing WAL for recovery if the file has data.
143        let (file_offset, next_lsn) = if path.exists() && std::fs::metadata(path)?.len() > 0 {
144            let info = crate::recovery::recover(path)?;
145            (info.end_offset, info.next_lsn())
146        } else {
147            (0, 1)
148        };
149
150        let double_write = open_dwb_for(&config, path);
151
152        Ok(Self {
153            file,
154            buffer,
155            file_offset,
156            next_lsn: AtomicU64::new(next_lsn),
157            sealed: false,
158            config,
159            encryption_ring: None,
160            double_write,
161        })
162    }
163
164    /// Set the encryption key. When set, all subsequent records will have
165    /// their payloads encrypted with AES-256-GCM.
166    pub fn set_encryption_key(&mut self, key: crate::crypto::WalEncryptionKey) {
167        self.encryption_ring = Some(crate::crypto::KeyRing::new(key));
168    }
169
170    /// Set the key ring directly (for key rotation with dual-key reads).
171    pub fn set_encryption_ring(&mut self, ring: crate::crypto::KeyRing) {
172        self.encryption_ring = Some(ring);
173    }
174
175    /// Access the key ring (for decryption during replay).
176    pub fn encryption_ring(&self) -> Option<&crate::crypto::KeyRing> {
177        self.encryption_ring.as_ref()
178    }
179
180    /// Open a new WAL segment file with a specific starting LSN.
181    ///
182    /// Used by `SegmentedWal` when rolling to a new segment. The file must
183    /// not already exist (or be empty). The writer will assign LSNs starting
184    /// from `start_lsn`.
185    pub fn open_with_start_lsn(
186        path: &Path,
187        config: WalWriterConfig,
188        start_lsn: u64,
189    ) -> Result<Self> {
190        let mut opts = OpenOptions::new();
191        opts.create(true).write(true).append(false);
192
193        if config.use_direct_io {
194            opts.custom_flags(libc::O_DIRECT);
195        }
196
197        let file = opts.open(path)?;
198        let buffer = AlignedBuf::new(config.write_buffer_size, config.alignment)?;
199
200        let double_write = open_dwb_for(&config, path);
201
202        Ok(Self {
203            file,
204            buffer,
205            file_offset: 0,
206            next_lsn: AtomicU64::new(start_lsn),
207            sealed: false,
208            config,
209            encryption_ring: None,
210            double_write,
211        })
212    }
213
214    /// Open a WAL writer with O_DIRECT disabled (for testing on tmpfs, etc.).
215    pub fn open_without_direct_io(path: &Path) -> Result<Self> {
216        Self::open(
217            path,
218            WalWriterConfig {
219                use_direct_io: false,
220                ..Default::default()
221            },
222        )
223    }
224
225    /// Append a record to the WAL. Returns the assigned LSN.
226    ///
227    /// The record is written to the in-memory buffer. Call `sync()` to
228    /// flush to disk and make the write durable.
229    pub fn append(
230        &mut self,
231        record_type: u16,
232        tenant_id: u32,
233        vshard_id: u16,
234        payload: &[u8],
235    ) -> Result<u64> {
236        if self.sealed {
237            return Err(WalError::Sealed);
238        }
239
240        let lsn = self.next_lsn.fetch_add(1, Ordering::Relaxed);
241        let record = WalRecord::new(
242            record_type,
243            lsn,
244            tenant_id,
245            vshard_id,
246            payload.to_vec(),
247            self.encryption_ring.as_ref().map(|r| r.current()),
248        )?;
249
250        // Write to double-write buffer (deferred — no fsync yet).
251        // The DWB is fsynced in batch during `sync()`, before the WAL fsync.
252        // This amortizes DWB fsync cost across the entire group commit batch.
253        //
254        // DWB failure is non-fatal for the write itself (the WAL is the
255        // authoritative store), but we log a warning because it means
256        // torn-write recovery is degraded. If the DWB is persistently
257        // broken, we detach it to avoid repeated error noise.
258        if let Some(dwb) = &mut self.double_write
259            && let Err(e) = dwb.write_record_deferred(&record)
260        {
261            tracing::warn!(
262                lsn = lsn,
263                error = %e,
264                "DWB write failed — torn-write protection degraded, detaching DWB"
265            );
266            self.double_write = None;
267        }
268
269        let header_bytes = record.header.to_bytes();
270        let total_size = HEADER_SIZE + record.payload.len();
271
272        // If this record doesn't fit in the remaining buffer, flush first.
273        if self.buffer.remaining() < total_size {
274            self.flush_buffer()?;
275        }
276
277        // If the record is larger than the entire buffer, we have a problem.
278        // This shouldn't happen with MAX_WAL_PAYLOAD_SIZE checks, but guard anyway.
279        if total_size > self.buffer.capacity() {
280            return Err(WalError::PayloadTooLarge {
281                size: record.payload.len(),
282                max: self.buffer.capacity() - HEADER_SIZE,
283            });
284        }
285
286        self.buffer.write(&header_bytes);
287        self.buffer.write(&record.payload);
288
289        Ok(lsn)
290    }
291
292    /// Flush the write buffer to disk (group commit).
293    ///
294    /// This issues a single write + fsync for all records accumulated
295    /// since the last flush. The DWB is also fsynced (one fsync for all
296    /// deferred DWB writes in this batch).
297    pub fn sync(&mut self) -> Result<()> {
298        if self.buffer.is_empty() {
299            return Ok(());
300        }
301        // Flush DWB first — records must be durable in DWB before WAL.
302        // If DWB flush fails, torn-write protection is lost for this batch.
303        // We log a warning and detach the DWB rather than silently continuing
304        // as if torn-write protection is active.
305        if let Some(dwb) = &mut self.double_write
306            && let Err(e) = dwb.flush()
307        {
308            tracing::warn!(
309                error = %e,
310                "DWB flush failed — torn-write protection lost for this batch, detaching DWB"
311            );
312            self.double_write = None;
313        }
314        self.flush_buffer()?;
315        self.file.sync_all()?;
316        Ok(())
317    }
318
319    /// Seal the WAL — no more writes will be accepted.
320    ///
321    /// Flushes any buffered data before sealing.
322    pub fn seal(&mut self) -> Result<()> {
323        self.sync()?;
324        self.sealed = true;
325        Ok(())
326    }
327
328    /// The next LSN that will be assigned.
329    pub fn next_lsn(&self) -> u64 {
330        self.next_lsn.load(Ordering::Relaxed)
331    }
332
333    /// Current file size (bytes written to disk).
334    pub fn file_offset(&self) -> u64 {
335        self.file_offset
336    }
337
338    /// Flush the aligned buffer to the file.
339    fn flush_buffer(&mut self) -> Result<()> {
340        if self.buffer.is_empty() {
341            return Ok(());
342        }
343
344        let data = if self.config.use_direct_io {
345            // O_DIRECT requires aligned I/O size.
346            self.buffer.as_aligned_slice()
347        } else {
348            // Without O_DIRECT, write only the actual data.
349            self.buffer.as_slice()
350        };
351
352        // Use pwrite to write at the exact offset, retrying on short writes.
353        #[cfg(unix)]
354        {
355            use std::os::unix::io::AsRawFd;
356            let fd = self.file.as_raw_fd();
357            let mut remaining = data;
358            let mut write_offset = self.file_offset;
359            while !remaining.is_empty() {
360                let written = unsafe {
361                    libc::pwrite(
362                        fd,
363                        remaining.as_ptr() as *const libc::c_void,
364                        remaining.len(),
365                        write_offset as libc::off_t,
366                    )
367                };
368                if written < 0 {
369                    return Err(WalError::Io(std::io::Error::last_os_error()));
370                }
371                let n = written as usize;
372                remaining = &remaining[n..];
373                write_offset += n as u64;
374            }
375        }
376
377        self.file_offset += data.len() as u64;
378        self.buffer.clear();
379        Ok(())
380    }
381}
382
383#[cfg(test)]
384mod tests {
385    use super::*;
386    use crate::record::RecordType;
387
388    #[test]
389    fn write_and_sync_single_record() {
390        let dir = tempfile::tempdir().unwrap();
391        let path = dir.path().join("test.wal");
392
393        let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
394        let lsn = writer
395            .append(RecordType::Put as u16, 1, 0, b"hello")
396            .unwrap();
397        assert_eq!(lsn, 1);
398
399        writer.sync().unwrap();
400        assert!(writer.file_offset() > 0);
401    }
402
403    #[test]
404    fn lsn_increments() {
405        let dir = tempfile::tempdir().unwrap();
406        let path = dir.path().join("test.wal");
407
408        let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
409
410        let lsn1 = writer
411            .append(RecordType::Put as u16, 1, 0, b"first")
412            .unwrap();
413        let lsn2 = writer
414            .append(RecordType::Put as u16, 1, 0, b"second")
415            .unwrap();
416        let lsn3 = writer
417            .append(RecordType::Put as u16, 1, 0, b"third")
418            .unwrap();
419
420        assert_eq!(lsn1, 1);
421        assert_eq!(lsn2, 2);
422        assert_eq!(lsn3, 3);
423    }
424
425    #[test]
426    fn sealed_writer_rejects_writes() {
427        let dir = tempfile::tempdir().unwrap();
428        let path = dir.path().join("test.wal");
429
430        let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
431        writer.seal().unwrap();
432
433        assert!(matches!(
434            writer.append(RecordType::Put as u16, 1, 0, b"rejected"),
435            Err(WalError::Sealed)
436        ));
437    }
438}