Skip to main content

nodedb_wal/
writer.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! WAL writer with O_DIRECT and group commit.
4//!
5//! The writer accumulates records into an aligned buffer and flushes to disk
6//! when the buffer is full or when an explicit sync is requested.
7//!
8//! ## I/O path
9//!
10//! 1. Caller creates a `WalRecord` and submits it to the writer.
11//! 2. Writer serializes the record into the aligned write buffer.
12//! 3. When the buffer is full or `sync()` is called, the buffer is written
13//!    to the WAL file via `O_DIRECT` + `fsync`.
14//! 4. Group commit: multiple concurrent writers can submit records between
15//!    syncs, and they all share a single `fsync` call.
16//!
17//! ## Future: io_uring
18//!
19//! The current implementation uses standard `pwrite` + `fsync` with O_DIRECT.
20//! io_uring submission can be added once the bridge crate provides the TPC
21//! event loop integration.
22
23use std::fs::{File, OpenOptions};
24use std::os::unix::fs::OpenOptionsExt;
25use std::path::Path;
26use std::sync::atomic::{AtomicU64, Ordering};
27
28use crate::align::{AlignedBuf, DEFAULT_ALIGNMENT};
29use crate::double_write::DwbMode;
30use crate::error::{Result, WalError};
31use crate::preamble::SegmentPreamble;
32use crate::record::{HEADER_SIZE, WalRecord};
33
34/// Default write buffer size: 2 MiB.
35///
36/// This is the batch size for group commit. Records accumulate here until
37/// the buffer is full or `sync()` is called.
38///
39/// Matches `WalTuning::write_buffer_size` default. Override via
40/// `WalWriterConfig::write_buffer_size` at construction time.
41pub const DEFAULT_WRITE_BUFFER_SIZE: usize = 2 * 1024 * 1024;
42
43/// Configuration for the WAL writer.
44#[derive(Debug, Clone)]
45pub struct WalWriterConfig {
46    /// Size of the aligned write buffer (rounded up to alignment).
47    pub write_buffer_size: usize,
48
49    /// O_DIRECT alignment (typically 4096 for NVMe).
50    pub alignment: usize,
51
52    /// Whether to use O_DIRECT. Set to `false` for testing on filesystems
53    /// that don't support it (e.g., tmpfs).
54    pub use_direct_io: bool,
55
56    /// Double-write buffer I/O mode. `None` means "mirror the parent" —
57    /// `Direct` when `use_direct_io` is true, `Buffered` otherwise.
58    /// `Some(DwbMode::Off)` disables the DWB entirely.
59    pub dwb_mode: Option<DwbMode>,
60}
61
62impl Default for WalWriterConfig {
63    fn default() -> Self {
64        Self {
65            write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
66            alignment: DEFAULT_ALIGNMENT,
67            use_direct_io: true,
68            dwb_mode: None,
69        }
70    }
71}
72
73fn resolve_dwb_mode(config: &WalWriterConfig) -> DwbMode {
74    config
75        .dwb_mode
76        .unwrap_or_else(|| DwbMode::default_for_parent(config.use_direct_io))
77}
78
79fn open_dwb_for(
80    config: &WalWriterConfig,
81    path: &Path,
82) -> Option<crate::double_write::DoubleWriteBuffer> {
83    let mode = resolve_dwb_mode(config);
84    if mode == DwbMode::Off {
85        return None;
86    }
87    let dwb_path = path.with_extension("dwb");
88    match crate::double_write::DoubleWriteBuffer::open(&dwb_path, mode) {
89        Ok(d) => Some(d),
90        Err(e) => {
91            tracing::warn!(
92                path = %dwb_path.display(),
93                error = %e,
94                mode = ?mode,
95                "failed to open DWB — torn-write protection disabled for this writer"
96            );
97            None
98        }
99    }
100}
101
102/// Append-only WAL writer.
103pub struct WalWriter {
104    /// The WAL file handle (opened with O_DIRECT if configured).
105    file: File,
106
107    /// Aligned write buffer for batching records before flush.
108    buffer: AlignedBuf,
109
110    /// Current file write offset (always aligned).
111    file_offset: u64,
112
113    /// Next LSN to assign.
114    next_lsn: AtomicU64,
115
116    /// Whether the writer has been sealed (no more writes accepted).
117    sealed: bool,
118
119    /// Configuration.
120    config: WalWriterConfig,
121
122    /// Optional key ring for payload encryption (supports key rotation).
123    encryption_ring: Option<crate::crypto::KeyRing>,
124
125    /// Preamble written at the start of this segment (when encryption is active).
126    /// Its 16 bytes are included as part of the AAD on every encrypted record,
127    /// binding ciphertext to the segment it was written in.
128    segment_preamble: Option<SegmentPreamble>,
129
130    /// Optional double-write buffer for torn write protection.
131    /// Records are written here before the WAL for crash recovery.
132    double_write: Option<crate::double_write::DoubleWriteBuffer>,
133}
134
135impl WalWriter {
136    /// Open or create a WAL file at the given path.
137    pub fn open(path: &Path, config: WalWriterConfig) -> Result<Self> {
138        let mut opts = OpenOptions::new();
139        opts.create(true).write(true).append(false);
140
141        if config.use_direct_io {
142            // O_DIRECT: bypass page cache.
143            opts.custom_flags(libc::O_DIRECT);
144        }
145
146        let file = opts.open(path)?;
147
148        let buffer = AlignedBuf::new(config.write_buffer_size, config.alignment)?;
149
150        // Scan existing WAL for recovery if the file has data.
151        let (file_offset, next_lsn) = if path.exists() && std::fs::metadata(path)?.len() > 0 {
152            let info = crate::recovery::recover(path)?;
153            (info.end_offset, info.next_lsn())
154        } else {
155            (0, 1)
156        };
157
158        let double_write = open_dwb_for(&config, path);
159
160        Ok(Self {
161            file,
162            buffer,
163            file_offset,
164            next_lsn: AtomicU64::new(next_lsn),
165            sealed: false,
166            config,
167            encryption_ring: None,
168            segment_preamble: None,
169            double_write,
170        })
171    }
172
173    /// Set the encryption key. When set, all subsequent records will have
174    /// their payloads encrypted with AES-256-GCM.
175    ///
176    /// Writes the 16-byte WAL segment preamble at the current file offset.
177    /// Must be called before the first `append`. Calling it after records
178    /// have already been written to this file returns an error.
179    pub fn set_encryption_key(&mut self, key: crate::crypto::WalEncryptionKey) -> Result<()> {
180        self.set_encryption_ring(crate::crypto::KeyRing::new(key))
181    }
182
183    /// Set the key ring directly (for key rotation with dual-key reads).
184    ///
185    /// Writes the 16-byte WAL segment preamble at the current file offset.
186    /// Must be called before the first `append`. Calling it after records
187    /// have already been written to this file returns an error.
188    pub fn set_encryption_ring(&mut self, ring: crate::crypto::KeyRing) -> Result<()> {
189        if self.file_offset != 0 || !self.buffer.is_empty() {
190            return Err(WalError::EncryptionError {
191                detail: "set_encryption_ring must be called before writing any records".into(),
192            });
193        }
194        let epoch = *ring.current().epoch();
195        let preamble = SegmentPreamble::new_wal(epoch);
196        let preamble_bytes = preamble.to_bytes();
197
198        // Write preamble into the buffer so it gets flushed with the first
199        // record batch (or on the next sync).
200        self.buffer.write(&preamble_bytes);
201
202        self.encryption_ring = Some(ring);
203        self.segment_preamble = Some(preamble);
204        Ok(())
205    }
206
207    /// Access the key ring (for decryption during replay).
208    pub fn encryption_ring(&self) -> Option<&crate::crypto::KeyRing> {
209        self.encryption_ring.as_ref()
210    }
211
212    /// The preamble for this segment, if encryption was enabled.
213    pub fn segment_preamble(&self) -> Option<&SegmentPreamble> {
214        self.segment_preamble.as_ref()
215    }
216
217    /// Open a new WAL segment file with a specific starting LSN.
218    ///
219    /// Used by `SegmentedWal` when rolling to a new segment. The file must
220    /// not already exist (or be empty). The writer will assign LSNs starting
221    /// from `start_lsn`.
222    pub fn open_with_start_lsn(
223        path: &Path,
224        config: WalWriterConfig,
225        start_lsn: u64,
226    ) -> Result<Self> {
227        let mut opts = OpenOptions::new();
228        opts.create(true).write(true).append(false);
229
230        if config.use_direct_io {
231            opts.custom_flags(libc::O_DIRECT);
232        }
233
234        let file = opts.open(path)?;
235        let buffer = AlignedBuf::new(config.write_buffer_size, config.alignment)?;
236
237        let double_write = open_dwb_for(&config, path);
238
239        Ok(Self {
240            file,
241            buffer,
242            file_offset: 0,
243            next_lsn: AtomicU64::new(start_lsn),
244            sealed: false,
245            config,
246            encryption_ring: None,
247            segment_preamble: None,
248            double_write,
249        })
250    }
251
252    /// Open a WAL writer with O_DIRECT disabled (for testing on tmpfs, etc.).
253    pub fn open_without_direct_io(path: &Path) -> Result<Self> {
254        Self::open(
255            path,
256            WalWriterConfig {
257                use_direct_io: false,
258                ..Default::default()
259            },
260        )
261    }
262
263    /// Append a record to the WAL. Returns the assigned LSN.
264    ///
265    /// The record is written to the in-memory buffer. Call `sync()` to
266    /// flush to disk and make the write durable.
267    ///
268    /// `database_id` is stored in header bytes 34-41. Pass `0` for the
269    /// default database (backward-compatible with pre-existing records).
270    pub fn append(
271        &mut self,
272        record_type: u32,
273        tenant_id: u64,
274        vshard_id: u32,
275        database_id: u64,
276        payload: &[u8],
277    ) -> Result<u64> {
278        if self.sealed {
279            return Err(WalError::Sealed);
280        }
281
282        let lsn = self.next_lsn.fetch_add(1, Ordering::Relaxed);
283        let preamble_bytes = self.segment_preamble.as_ref().map(|p| p.to_bytes());
284        let record = WalRecord::new(
285            record_type,
286            lsn,
287            tenant_id,
288            vshard_id,
289            database_id,
290            payload.to_vec(),
291            self.encryption_ring.as_ref().map(|r| r.current()),
292            preamble_bytes.as_ref(),
293        )?;
294
295        // Write to double-write buffer (deferred — no fsync yet).
296        // The DWB is fsynced in batch during `sync()`, before the WAL fsync.
297        // This amortizes DWB fsync cost across the entire group commit batch.
298        //
299        // DWB failure is non-fatal for the write itself (the WAL is the
300        // authoritative store), but we log a warning because it means
301        // torn-write recovery is degraded. If the DWB is persistently
302        // broken, we detach it to avoid repeated error noise.
303        if let Some(dwb) = &mut self.double_write
304            && let Err(e) = dwb.write_record_deferred(&record)
305        {
306            tracing::warn!(
307                lsn = lsn,
308                error = %e,
309                "DWB write failed — torn-write protection degraded, detaching DWB"
310            );
311            self.double_write = None;
312        }
313
314        let header_bytes = record.header.to_bytes();
315        let total_size = HEADER_SIZE + record.payload.len();
316
317        // If this record doesn't fit in the remaining buffer, flush first.
318        if self.buffer.remaining() < total_size {
319            self.flush_buffer()?;
320        }
321
322        // If the record is larger than the entire buffer, we have a problem.
323        // This shouldn't happen with MAX_WAL_PAYLOAD_SIZE checks, but guard anyway.
324        if total_size > self.buffer.capacity() {
325            return Err(WalError::PayloadTooLarge {
326                size: record.payload.len(),
327                max: self.buffer.capacity() - HEADER_SIZE,
328            });
329        }
330
331        self.buffer.write(&header_bytes);
332        self.buffer.write(&record.payload);
333
334        Ok(lsn)
335    }
336
337    /// Flush the write buffer to disk (group commit).
338    ///
339    /// This issues a single write + fsync for all records accumulated
340    /// since the last flush. The DWB is also fsynced (one fsync for all
341    /// deferred DWB writes in this batch).
342    pub fn sync(&mut self) -> Result<()> {
343        if self.buffer.is_empty() {
344            return Ok(());
345        }
346        // Flush DWB first — records must be durable in DWB before WAL.
347        // If DWB flush fails, torn-write protection is lost for this batch.
348        // We log a warning and detach the DWB rather than silently continuing
349        // as if torn-write protection is active.
350        if let Some(dwb) = &mut self.double_write
351            && let Err(e) = dwb.flush()
352        {
353            tracing::warn!(
354                error = %e,
355                "DWB flush failed — torn-write protection lost for this batch, detaching DWB"
356            );
357            self.double_write = None;
358        }
359        self.flush_buffer()?;
360        self.file.sync_all()?;
361        Ok(())
362    }
363
364    /// Seal the WAL — no more writes will be accepted.
365    ///
366    /// Flushes any buffered data before sealing.
367    pub fn seal(&mut self) -> Result<()> {
368        self.sync()?;
369        self.sealed = true;
370        Ok(())
371    }
372
373    /// The next LSN that will be assigned.
374    pub fn next_lsn(&self) -> u64 {
375        self.next_lsn.load(Ordering::Relaxed)
376    }
377
378    /// Current file size (bytes written to disk).
379    pub fn file_offset(&self) -> u64 {
380        self.file_offset
381    }
382
383    /// Flush the aligned buffer to the file.
384    fn flush_buffer(&mut self) -> Result<()> {
385        if self.buffer.is_empty() {
386            return Ok(());
387        }
388
389        let data = if self.config.use_direct_io {
390            // O_DIRECT requires aligned I/O size.
391            self.buffer.as_aligned_slice()
392        } else {
393            // Without O_DIRECT, write only the actual data.
394            self.buffer.as_slice()
395        };
396
397        // Use pwrite to write at the exact offset, retrying on short writes.
398        #[cfg(unix)]
399        {
400            use std::os::unix::io::AsRawFd;
401            let fd = self.file.as_raw_fd();
402            let mut remaining = data;
403            let mut write_offset = self.file_offset;
404            while !remaining.is_empty() {
405                let written = unsafe {
406                    libc::pwrite(
407                        fd,
408                        remaining.as_ptr() as *const libc::c_void,
409                        remaining.len(),
410                        write_offset as libc::off_t,
411                    )
412                };
413                if written < 0 {
414                    return Err(WalError::Io(std::io::Error::last_os_error()));
415                }
416                let n = written as usize;
417                remaining = &remaining[n..];
418                write_offset += n as u64;
419            }
420        }
421
422        self.file_offset += data.len() as u64;
423        self.buffer.clear();
424        Ok(())
425    }
426}
427
428#[cfg(test)]
429mod tests {
430    use super::*;
431    use crate::record::RecordType;
432
433    #[test]
434    fn write_and_sync_single_record() {
435        let dir = tempfile::tempdir().unwrap();
436        let path = dir.path().join("test.wal");
437
438        let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
439        let lsn = writer
440            .append(RecordType::Put as u32, 1, 0, 0, b"hello")
441            .unwrap();
442        assert_eq!(lsn, 1);
443
444        writer.sync().unwrap();
445        assert!(writer.file_offset() > 0);
446    }
447
448    #[test]
449    fn lsn_increments() {
450        let dir = tempfile::tempdir().unwrap();
451        let path = dir.path().join("test.wal");
452
453        let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
454
455        let lsn1 = writer
456            .append(RecordType::Put as u32, 1, 0, 0, b"first")
457            .unwrap();
458        let lsn2 = writer
459            .append(RecordType::Put as u32, 1, 0, 0, b"second")
460            .unwrap();
461        let lsn3 = writer
462            .append(RecordType::Put as u32, 1, 0, 0, b"third")
463            .unwrap();
464
465        assert_eq!(lsn1, 1);
466        assert_eq!(lsn2, 2);
467        assert_eq!(lsn3, 3);
468    }
469
470    #[test]
471    fn sealed_writer_rejects_writes() {
472        let dir = tempfile::tempdir().unwrap();
473        let path = dir.path().join("test.wal");
474
475        let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
476        writer.seal().unwrap();
477
478        assert!(matches!(
479            writer.append(RecordType::Put as u32, 1, 0, 0, b"rejected"),
480            Err(WalError::Sealed)
481        ));
482    }
483}