Skip to main content

nodedb_wal/
writer.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! WAL writer with O_DIRECT and group commit.
4//!
5//! The writer accumulates records into an aligned buffer and flushes to disk
6//! when the buffer is full or when an explicit sync is requested.
7//!
8//! ## I/O path
9//!
10//! 1. Caller creates a `WalRecord` and submits it to the writer.
11//! 2. Writer serializes the record into the aligned write buffer.
12//! 3. When the buffer is full or `sync()` is called, the buffer is written
13//!    to the WAL file via `O_DIRECT` + `fsync`.
14//! 4. Group commit: multiple concurrent writers can submit records between
15//!    syncs, and they all share a single `fsync` call.
16//!
17//! ## Future: io_uring
18//!
19//! The current implementation uses standard `pwrite` + `fsync` with O_DIRECT.
20//! io_uring submission can be added once the bridge crate provides the TPC
21//! event loop integration.
22
23use std::fs::{File, OpenOptions};
24use std::os::unix::fs::OpenOptionsExt;
25use std::path::Path;
26use std::sync::atomic::{AtomicU64, Ordering};
27
28use crate::align::{AlignedBuf, DEFAULT_ALIGNMENT};
29use crate::double_write::DwbMode;
30use crate::error::{Result, WalError};
31use crate::preamble::SegmentPreamble;
32use crate::record::{HEADER_SIZE, WalRecord};
33
34/// Default write buffer size: 2 MiB.
35///
36/// This is the batch size for group commit. Records accumulate here until
37/// the buffer is full or `sync()` is called.
38///
39/// Matches `WalTuning::write_buffer_size` default. Override via
40/// `WalWriterConfig::write_buffer_size` at construction time.
41pub const DEFAULT_WRITE_BUFFER_SIZE: usize = 2 * 1024 * 1024;
42
43/// Configuration for the WAL writer.
44#[derive(Debug, Clone)]
45pub struct WalWriterConfig {
46    /// Size of the aligned write buffer (rounded up to alignment).
47    pub write_buffer_size: usize,
48
49    /// O_DIRECT alignment (typically 4096 for NVMe).
50    pub alignment: usize,
51
52    /// Whether to use O_DIRECT. Set to `false` for testing on filesystems
53    /// that don't support it (e.g., tmpfs).
54    pub use_direct_io: bool,
55
56    /// Double-write buffer I/O mode. `None` means "mirror the parent" —
57    /// `Direct` when `use_direct_io` is true, `Buffered` otherwise.
58    /// `Some(DwbMode::Off)` disables the DWB entirely.
59    pub dwb_mode: Option<DwbMode>,
60}
61
62impl Default for WalWriterConfig {
63    fn default() -> Self {
64        Self {
65            write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
66            alignment: DEFAULT_ALIGNMENT,
67            use_direct_io: true,
68            dwb_mode: None,
69        }
70    }
71}
72
73fn resolve_dwb_mode(config: &WalWriterConfig) -> DwbMode {
74    config
75        .dwb_mode
76        .unwrap_or_else(|| DwbMode::default_for_parent(config.use_direct_io))
77}
78
79fn open_dwb_for(
80    config: &WalWriterConfig,
81    path: &Path,
82) -> Option<crate::double_write::DoubleWriteBuffer> {
83    let mode = resolve_dwb_mode(config);
84    if mode == DwbMode::Off {
85        return None;
86    }
87    let dwb_path = path.with_extension("dwb");
88    match crate::double_write::DoubleWriteBuffer::open(&dwb_path, mode) {
89        Ok(d) => Some(d),
90        Err(e) => {
91            tracing::warn!(
92                path = %dwb_path.display(),
93                error = %e,
94                mode = ?mode,
95                "failed to open DWB — torn-write protection disabled for this writer"
96            );
97            None
98        }
99    }
100}
101
102/// Append-only WAL writer.
103pub struct WalWriter {
104    /// The WAL file handle (opened with O_DIRECT if configured).
105    file: File,
106
107    /// Aligned write buffer for batching records before flush.
108    buffer: AlignedBuf,
109
110    /// Current file write offset (always aligned).
111    file_offset: u64,
112
113    /// Next LSN to assign.
114    next_lsn: AtomicU64,
115
116    /// Whether the writer has been sealed (no more writes accepted).
117    sealed: bool,
118
119    /// Configuration.
120    config: WalWriterConfig,
121
122    /// Optional key ring for payload encryption (supports key rotation).
123    encryption_ring: Option<crate::crypto::KeyRing>,
124
125    /// Preamble written at the start of this segment (when encryption is active).
126    /// Its 16 bytes are included as part of the AAD on every encrypted record,
127    /// binding ciphertext to the segment it was written in.
128    segment_preamble: Option<SegmentPreamble>,
129
130    /// Optional double-write buffer for torn write protection.
131    /// Records are written here before the WAL for crash recovery.
132    double_write: Option<crate::double_write::DoubleWriteBuffer>,
133}
134
135impl WalWriter {
136    /// Open or create a WAL file at the given path.
137    pub fn open(path: &Path, config: WalWriterConfig) -> Result<Self> {
138        let mut opts = OpenOptions::new();
139        opts.create(true).write(true).append(false);
140
141        if config.use_direct_io {
142            // O_DIRECT: bypass page cache.
143            opts.custom_flags(libc::O_DIRECT);
144        }
145
146        let file = opts.open(path)?;
147
148        let buffer = AlignedBuf::new(config.write_buffer_size, config.alignment)?;
149
150        // Scan existing WAL for recovery if the file has data.
151        let (file_offset, next_lsn) = if path.exists() && std::fs::metadata(path)?.len() > 0 {
152            let info = crate::recovery::recover(path)?;
153            (info.end_offset, info.next_lsn())
154        } else {
155            (0, 1)
156        };
157
158        let double_write = open_dwb_for(&config, path);
159
160        Ok(Self {
161            file,
162            buffer,
163            file_offset,
164            next_lsn: AtomicU64::new(next_lsn),
165            sealed: false,
166            config,
167            encryption_ring: None,
168            segment_preamble: None,
169            double_write,
170        })
171    }
172
173    /// Set the encryption key. When set, all subsequent records will have
174    /// their payloads encrypted with AES-256-GCM.
175    ///
176    /// Writes the 16-byte WAL segment preamble at the current file offset.
177    /// Must be called before the first `append`. Calling it after records
178    /// have already been written to this file returns an error.
179    pub fn set_encryption_key(&mut self, key: crate::crypto::WalEncryptionKey) -> Result<()> {
180        self.set_encryption_ring(crate::crypto::KeyRing::new(key))
181    }
182
183    /// Set the key ring directly (for key rotation with dual-key reads).
184    ///
185    /// Writes the 16-byte WAL segment preamble at the current file offset.
186    /// Must be called before the first `append`. Calling it after records
187    /// have already been written to this file returns an error.
188    pub fn set_encryption_ring(&mut self, ring: crate::crypto::KeyRing) -> Result<()> {
189        if self.file_offset != 0 || !self.buffer.is_empty() {
190            return Err(WalError::EncryptionError {
191                detail: "set_encryption_ring must be called before writing any records".into(),
192            });
193        }
194        let epoch = *ring.current().epoch();
195        let preamble = SegmentPreamble::new_wal(epoch);
196        let preamble_bytes = preamble.to_bytes();
197
198        // Write preamble into the buffer so it gets flushed with the first
199        // record batch (or on the next sync).
200        self.buffer.write(&preamble_bytes);
201
202        self.encryption_ring = Some(ring);
203        self.segment_preamble = Some(preamble);
204        Ok(())
205    }
206
207    /// Access the key ring (for decryption during replay).
208    pub fn encryption_ring(&self) -> Option<&crate::crypto::KeyRing> {
209        self.encryption_ring.as_ref()
210    }
211
212    /// The preamble for this segment, if encryption was enabled.
213    pub fn segment_preamble(&self) -> Option<&SegmentPreamble> {
214        self.segment_preamble.as_ref()
215    }
216
217    /// Open a new WAL segment file with a specific starting LSN.
218    ///
219    /// Used by `SegmentedWal` when rolling to a new segment. The file must
220    /// not already exist (or be empty). The writer will assign LSNs starting
221    /// from `start_lsn`.
222    pub fn open_with_start_lsn(
223        path: &Path,
224        config: WalWriterConfig,
225        start_lsn: u64,
226    ) -> Result<Self> {
227        let mut opts = OpenOptions::new();
228        opts.create(true).write(true).append(false);
229
230        if config.use_direct_io {
231            opts.custom_flags(libc::O_DIRECT);
232        }
233
234        let file = opts.open(path)?;
235        let buffer = AlignedBuf::new(config.write_buffer_size, config.alignment)?;
236
237        let double_write = open_dwb_for(&config, path);
238
239        Ok(Self {
240            file,
241            buffer,
242            file_offset: 0,
243            next_lsn: AtomicU64::new(start_lsn),
244            sealed: false,
245            config,
246            encryption_ring: None,
247            segment_preamble: None,
248            double_write,
249        })
250    }
251
252    /// Open a WAL writer with O_DIRECT disabled (for testing on tmpfs, etc.).
253    pub fn open_without_direct_io(path: &Path) -> Result<Self> {
254        Self::open(
255            path,
256            WalWriterConfig {
257                use_direct_io: false,
258                ..Default::default()
259            },
260        )
261    }
262
263    /// Append a record to the WAL. Returns the assigned LSN.
264    ///
265    /// The record is written to the in-memory buffer. Call `sync()` to
266    /// flush to disk and make the write durable.
267    pub fn append(
268        &mut self,
269        record_type: u32,
270        tenant_id: u64,
271        vshard_id: u32,
272        payload: &[u8],
273    ) -> Result<u64> {
274        if self.sealed {
275            return Err(WalError::Sealed);
276        }
277
278        let lsn = self.next_lsn.fetch_add(1, Ordering::Relaxed);
279        let preamble_bytes = self.segment_preamble.as_ref().map(|p| p.to_bytes());
280        let record = WalRecord::new(
281            record_type,
282            lsn,
283            tenant_id,
284            vshard_id,
285            payload.to_vec(),
286            self.encryption_ring.as_ref().map(|r| r.current()),
287            preamble_bytes.as_ref(),
288        )?;
289
290        // Write to double-write buffer (deferred — no fsync yet).
291        // The DWB is fsynced in batch during `sync()`, before the WAL fsync.
292        // This amortizes DWB fsync cost across the entire group commit batch.
293        //
294        // DWB failure is non-fatal for the write itself (the WAL is the
295        // authoritative store), but we log a warning because it means
296        // torn-write recovery is degraded. If the DWB is persistently
297        // broken, we detach it to avoid repeated error noise.
298        if let Some(dwb) = &mut self.double_write
299            && let Err(e) = dwb.write_record_deferred(&record)
300        {
301            tracing::warn!(
302                lsn = lsn,
303                error = %e,
304                "DWB write failed — torn-write protection degraded, detaching DWB"
305            );
306            self.double_write = None;
307        }
308
309        let header_bytes = record.header.to_bytes();
310        let total_size = HEADER_SIZE + record.payload.len();
311
312        // If this record doesn't fit in the remaining buffer, flush first.
313        if self.buffer.remaining() < total_size {
314            self.flush_buffer()?;
315        }
316
317        // If the record is larger than the entire buffer, we have a problem.
318        // This shouldn't happen with MAX_WAL_PAYLOAD_SIZE checks, but guard anyway.
319        if total_size > self.buffer.capacity() {
320            return Err(WalError::PayloadTooLarge {
321                size: record.payload.len(),
322                max: self.buffer.capacity() - HEADER_SIZE,
323            });
324        }
325
326        self.buffer.write(&header_bytes);
327        self.buffer.write(&record.payload);
328
329        Ok(lsn)
330    }
331
332    /// Flush the write buffer to disk (group commit).
333    ///
334    /// This issues a single write + fsync for all records accumulated
335    /// since the last flush. The DWB is also fsynced (one fsync for all
336    /// deferred DWB writes in this batch).
337    pub fn sync(&mut self) -> Result<()> {
338        if self.buffer.is_empty() {
339            return Ok(());
340        }
341        // Flush DWB first — records must be durable in DWB before WAL.
342        // If DWB flush fails, torn-write protection is lost for this batch.
343        // We log a warning and detach the DWB rather than silently continuing
344        // as if torn-write protection is active.
345        if let Some(dwb) = &mut self.double_write
346            && let Err(e) = dwb.flush()
347        {
348            tracing::warn!(
349                error = %e,
350                "DWB flush failed — torn-write protection lost for this batch, detaching DWB"
351            );
352            self.double_write = None;
353        }
354        self.flush_buffer()?;
355        self.file.sync_all()?;
356        Ok(())
357    }
358
359    /// Seal the WAL — no more writes will be accepted.
360    ///
361    /// Flushes any buffered data before sealing.
362    pub fn seal(&mut self) -> Result<()> {
363        self.sync()?;
364        self.sealed = true;
365        Ok(())
366    }
367
368    /// The next LSN that will be assigned.
369    pub fn next_lsn(&self) -> u64 {
370        self.next_lsn.load(Ordering::Relaxed)
371    }
372
373    /// Current file size (bytes written to disk).
374    pub fn file_offset(&self) -> u64 {
375        self.file_offset
376    }
377
378    /// Flush the aligned buffer to the file.
379    fn flush_buffer(&mut self) -> Result<()> {
380        if self.buffer.is_empty() {
381            return Ok(());
382        }
383
384        let data = if self.config.use_direct_io {
385            // O_DIRECT requires aligned I/O size.
386            self.buffer.as_aligned_slice()
387        } else {
388            // Without O_DIRECT, write only the actual data.
389            self.buffer.as_slice()
390        };
391
392        // Use pwrite to write at the exact offset, retrying on short writes.
393        #[cfg(unix)]
394        {
395            use std::os::unix::io::AsRawFd;
396            let fd = self.file.as_raw_fd();
397            let mut remaining = data;
398            let mut write_offset = self.file_offset;
399            while !remaining.is_empty() {
400                let written = unsafe {
401                    libc::pwrite(
402                        fd,
403                        remaining.as_ptr() as *const libc::c_void,
404                        remaining.len(),
405                        write_offset as libc::off_t,
406                    )
407                };
408                if written < 0 {
409                    return Err(WalError::Io(std::io::Error::last_os_error()));
410                }
411                let n = written as usize;
412                remaining = &remaining[n..];
413                write_offset += n as u64;
414            }
415        }
416
417        self.file_offset += data.len() as u64;
418        self.buffer.clear();
419        Ok(())
420    }
421}
422
423#[cfg(test)]
424mod tests {
425    use super::*;
426    use crate::record::RecordType;
427
428    #[test]
429    fn write_and_sync_single_record() {
430        let dir = tempfile::tempdir().unwrap();
431        let path = dir.path().join("test.wal");
432
433        let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
434        let lsn = writer
435            .append(RecordType::Put as u32, 1, 0, b"hello")
436            .unwrap();
437        assert_eq!(lsn, 1);
438
439        writer.sync().unwrap();
440        assert!(writer.file_offset() > 0);
441    }
442
443    #[test]
444    fn lsn_increments() {
445        let dir = tempfile::tempdir().unwrap();
446        let path = dir.path().join("test.wal");
447
448        let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
449
450        let lsn1 = writer
451            .append(RecordType::Put as u32, 1, 0, b"first")
452            .unwrap();
453        let lsn2 = writer
454            .append(RecordType::Put as u32, 1, 0, b"second")
455            .unwrap();
456        let lsn3 = writer
457            .append(RecordType::Put as u32, 1, 0, b"third")
458            .unwrap();
459
460        assert_eq!(lsn1, 1);
461        assert_eq!(lsn2, 2);
462        assert_eq!(lsn3, 3);
463    }
464
465    #[test]
466    fn sealed_writer_rejects_writes() {
467        let dir = tempfile::tempdir().unwrap();
468        let path = dir.path().join("test.wal");
469
470        let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
471        writer.seal().unwrap();
472
473        assert!(matches!(
474            writer.append(RecordType::Put as u32, 1, 0, b"rejected"),
475            Err(WalError::Sealed)
476        ));
477    }
478}