Skip to main content

nodedb_wal/
writer.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! WAL writer with O_DIRECT and group commit.
4//!
5//! The writer accumulates records into an aligned buffer and flushes to disk
6//! when the buffer is full or when an explicit sync is requested.
7//!
8//! ## I/O path
9//!
10//! 1. Caller creates a `WalRecord` and submits it to the writer.
11//! 2. Writer serializes the record into the aligned write buffer.
12//! 3. When the buffer is full or `sync()` is called, the buffer is written
13//!    to the WAL file via `O_DIRECT` + `fsync`.
14//! 4. Group commit: multiple concurrent writers can submit records between
15//!    syncs, and they all share a single `fsync` call.
16//!
17//! ## Future: io_uring
18//!
19//! The current implementation uses standard `pwrite` + `fsync` with O_DIRECT.
20//! io_uring submission can be added once the bridge crate provides the TPC
21//! event loop integration.
22
23use std::fs::{File, OpenOptions};
24use std::path::Path;
25use std::sync::atomic::{AtomicU64, Ordering};
26
27#[cfg(not(target_arch = "wasm32"))]
28use std::os::unix::fs::OpenOptionsExt as _;
29
30use crate::align::{AlignedBuf, DEFAULT_ALIGNMENT};
31use crate::double_write::DwbMode;
32use crate::error::{Result, WalError};
33use crate::preamble::SegmentPreamble;
34use crate::record::{HEADER_SIZE, WalRecord};
35
36/// Default write buffer size: 2 MiB.
37///
38/// This is the batch size for group commit. Records accumulate here until
39/// the buffer is full or `sync()` is called.
40///
41/// Matches `WalTuning::write_buffer_size` default. Override via
42/// `WalWriterConfig::write_buffer_size` at construction time.
43pub const DEFAULT_WRITE_BUFFER_SIZE: usize = 2 * 1024 * 1024;
44
45/// Configuration for the WAL writer.
46#[derive(Debug, Clone)]
47pub struct WalWriterConfig {
48    /// Size of the aligned write buffer (rounded up to alignment).
49    pub write_buffer_size: usize,
50
51    /// O_DIRECT alignment (typically 4096 for NVMe).
52    pub alignment: usize,
53
54    /// Whether to use O_DIRECT. Set to `false` for testing on filesystems
55    /// that don't support it (e.g., tmpfs).
56    pub use_direct_io: bool,
57
58    /// Double-write buffer I/O mode. `None` means "mirror the parent" —
59    /// `Direct` when `use_direct_io` is true, `Buffered` otherwise.
60    /// `Some(DwbMode::Off)` disables the DWB entirely.
61    pub dwb_mode: Option<DwbMode>,
62}
63
64impl Default for WalWriterConfig {
65    fn default() -> Self {
66        Self {
67            write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
68            alignment: DEFAULT_ALIGNMENT,
69            use_direct_io: true,
70            dwb_mode: None,
71        }
72    }
73}
74
75fn resolve_dwb_mode(config: &WalWriterConfig) -> DwbMode {
76    config
77        .dwb_mode
78        .unwrap_or_else(|| DwbMode::default_for_parent(config.use_direct_io))
79}
80
81fn open_dwb_for(
82    config: &WalWriterConfig,
83    path: &Path,
84) -> Option<crate::double_write::DoubleWriteBuffer> {
85    let mode = resolve_dwb_mode(config);
86    if mode == DwbMode::Off {
87        return None;
88    }
89    let dwb_path = path.with_extension("dwb");
90    match crate::double_write::DoubleWriteBuffer::open(&dwb_path, mode) {
91        Ok(d) => Some(d),
92        Err(e) => {
93            tracing::warn!(
94                path = %dwb_path.display(),
95                error = %e,
96                mode = ?mode,
97                "failed to open DWB — torn-write protection disabled for this writer"
98            );
99            None
100        }
101    }
102}
103
104/// Append-only WAL writer.
105pub struct WalWriter {
106    /// The WAL file handle (opened with O_DIRECT if configured).
107    file: File,
108
109    /// Aligned write buffer for batching records before flush.
110    buffer: AlignedBuf,
111
112    /// Current file write offset (always aligned).
113    file_offset: u64,
114
115    /// Next LSN to assign.
116    next_lsn: AtomicU64,
117
118    /// Whether the writer has been sealed (no more writes accepted).
119    sealed: bool,
120
121    /// Configuration.
122    config: WalWriterConfig,
123
124    /// Optional key ring for payload encryption (supports key rotation).
125    encryption_ring: Option<crate::crypto::KeyRing>,
126
127    /// Preamble written at the start of this segment (when encryption is active).
128    /// Its 16 bytes are included as part of the AAD on every encrypted record,
129    /// binding ciphertext to the segment it was written in.
130    segment_preamble: Option<SegmentPreamble>,
131
132    /// Optional double-write buffer for torn write protection.
133    /// Records are written here before the WAL for crash recovery.
134    double_write: Option<crate::double_write::DoubleWriteBuffer>,
135}
136
137impl WalWriter {
138    /// Open or create a WAL file at the given path.
139    pub fn open(path: &Path, config: WalWriterConfig) -> Result<Self> {
140        let mut opts = OpenOptions::new();
141        opts.create(true).write(true).append(false);
142
143        #[cfg(not(target_arch = "wasm32"))]
144        if config.use_direct_io {
145            // O_DIRECT: bypass page cache.
146            opts.custom_flags(libc::O_DIRECT);
147        }
148
149        let file = opts.open(path)?;
150
151        let buffer = AlignedBuf::new(config.write_buffer_size, config.alignment)?;
152
153        // Scan existing WAL for recovery if the file has data.
154        let (file_offset, next_lsn) = if path.exists() && std::fs::metadata(path)?.len() > 0 {
155            let info = crate::recovery::recover(path)?;
156            (info.end_offset, info.next_lsn())
157        } else {
158            (0, 1)
159        };
160
161        let double_write = open_dwb_for(&config, path);
162
163        Ok(Self {
164            file,
165            buffer,
166            file_offset,
167            next_lsn: AtomicU64::new(next_lsn),
168            sealed: false,
169            config,
170            encryption_ring: None,
171            segment_preamble: None,
172            double_write,
173        })
174    }
175
176    /// Set the encryption key. When set, all subsequent records will have
177    /// their payloads encrypted with AES-256-GCM.
178    ///
179    /// Writes the 16-byte WAL segment preamble at the current file offset.
180    /// Must be called before the first `append`. Calling it after records
181    /// have already been written to this file returns an error.
182    pub fn set_encryption_key(&mut self, key: crate::crypto::WalEncryptionKey) -> Result<()> {
183        self.set_encryption_ring(crate::crypto::KeyRing::new(key))
184    }
185
186    /// Set the key ring directly (for key rotation with dual-key reads).
187    ///
188    /// Writes the 16-byte WAL segment preamble at the current file offset.
189    /// Must be called before the first `append`. Calling it after records
190    /// have already been written to this file returns an error.
191    pub fn set_encryption_ring(&mut self, ring: crate::crypto::KeyRing) -> Result<()> {
192        if self.file_offset != 0 || !self.buffer.is_empty() {
193            return Err(WalError::EncryptionError {
194                detail: "set_encryption_ring must be called before writing any records".into(),
195            });
196        }
197        let epoch = *ring.current().epoch();
198        let preamble = SegmentPreamble::new_wal(epoch);
199        let preamble_bytes = preamble.to_bytes();
200
201        // Write preamble into the buffer so it gets flushed with the first
202        // record batch (or on the next sync).
203        self.buffer.write(&preamble_bytes);
204
205        self.encryption_ring = Some(ring);
206        self.segment_preamble = Some(preamble);
207        Ok(())
208    }
209
210    /// Access the key ring (for decryption during replay).
211    pub fn encryption_ring(&self) -> Option<&crate::crypto::KeyRing> {
212        self.encryption_ring.as_ref()
213    }
214
215    /// The preamble for this segment, if encryption was enabled.
216    pub fn segment_preamble(&self) -> Option<&SegmentPreamble> {
217        self.segment_preamble.as_ref()
218    }
219
220    /// Open a new WAL segment file with a specific starting LSN.
221    ///
222    /// Used by `SegmentedWal` when rolling to a new segment. The file must
223    /// not already exist (or be empty). The writer will assign LSNs starting
224    /// from `start_lsn`.
225    pub fn open_with_start_lsn(
226        path: &Path,
227        config: WalWriterConfig,
228        start_lsn: u64,
229    ) -> Result<Self> {
230        let mut opts = OpenOptions::new();
231        opts.create(true).write(true).append(false);
232
233        #[cfg(not(target_arch = "wasm32"))]
234        if config.use_direct_io {
235            opts.custom_flags(libc::O_DIRECT);
236        }
237
238        let file = opts.open(path)?;
239        let buffer = AlignedBuf::new(config.write_buffer_size, config.alignment)?;
240
241        let double_write = open_dwb_for(&config, path);
242
243        Ok(Self {
244            file,
245            buffer,
246            file_offset: 0,
247            next_lsn: AtomicU64::new(start_lsn),
248            sealed: false,
249            config,
250            encryption_ring: None,
251            segment_preamble: None,
252            double_write,
253        })
254    }
255
256    /// Open a WAL writer with O_DIRECT disabled (for testing on tmpfs, etc.).
257    pub fn open_without_direct_io(path: &Path) -> Result<Self> {
258        Self::open(
259            path,
260            WalWriterConfig {
261                use_direct_io: false,
262                ..Default::default()
263            },
264        )
265    }
266
267    /// Append a record to the WAL. Returns the assigned LSN.
268    ///
269    /// The record is written to the in-memory buffer. Call `sync()` to
270    /// flush to disk and make the write durable.
271    ///
272    /// `database_id` is stored in header bytes 34-41. Pass `0` for the
273    /// default database (backward-compatible with pre-existing records).
274    pub fn append(
275        &mut self,
276        record_type: u32,
277        tenant_id: u64,
278        vshard_id: u32,
279        database_id: u64,
280        payload: &[u8],
281    ) -> Result<u64> {
282        if self.sealed {
283            return Err(WalError::Sealed);
284        }
285
286        let lsn = self.next_lsn.fetch_add(1, Ordering::Relaxed);
287        let preamble_bytes = self.segment_preamble.as_ref().map(|p| p.to_bytes());
288        let record = WalRecord::new(
289            record_type,
290            lsn,
291            tenant_id,
292            vshard_id,
293            database_id,
294            payload.to_vec(),
295            self.encryption_ring.as_ref().map(|r| r.current()),
296            preamble_bytes.as_ref(),
297        )?;
298
299        // Write to double-write buffer (deferred — no fsync yet).
300        // The DWB is fsynced in batch during `sync()`, before the WAL fsync.
301        // This amortizes DWB fsync cost across the entire group commit batch.
302        //
303        // DWB failure is non-fatal for the write itself (the WAL is the
304        // authoritative store), but we log a warning because it means
305        // torn-write recovery is degraded. If the DWB is persistently
306        // broken, we detach it to avoid repeated error noise.
307        if let Some(dwb) = &mut self.double_write
308            && let Err(e) = dwb.write_record_deferred(&record)
309        {
310            tracing::warn!(
311                lsn = lsn,
312                error = %e,
313                "DWB write failed — torn-write protection degraded, detaching DWB"
314            );
315            self.double_write = None;
316        }
317
318        let header_bytes = record.header.to_bytes();
319        let total_size = HEADER_SIZE + record.payload.len();
320
321        // If this record doesn't fit in the remaining buffer, flush first.
322        if self.buffer.remaining() < total_size {
323            self.flush_buffer()?;
324        }
325
326        // If the record is larger than the entire buffer, we have a problem.
327        // This shouldn't happen with MAX_WAL_PAYLOAD_SIZE checks, but guard anyway.
328        if total_size > self.buffer.capacity() {
329            return Err(WalError::PayloadTooLarge {
330                size: record.payload.len(),
331                max: self.buffer.capacity() - HEADER_SIZE,
332            });
333        }
334
335        self.buffer.write(&header_bytes);
336        self.buffer.write(&record.payload);
337
338        Ok(lsn)
339    }
340
341    /// Flush the write buffer to disk (group commit).
342    ///
343    /// This issues a single write + fsync for all records accumulated
344    /// since the last flush. The DWB is also fsynced (one fsync for all
345    /// deferred DWB writes in this batch).
346    pub fn sync(&mut self) -> Result<()> {
347        if self.buffer.is_empty() {
348            return Ok(());
349        }
350        // Flush DWB first — records must be durable in DWB before WAL.
351        // If DWB flush fails, torn-write protection is lost for this batch.
352        // We log a warning and detach the DWB rather than silently continuing
353        // as if torn-write protection is active.
354        if let Some(dwb) = &mut self.double_write
355            && let Err(e) = dwb.flush()
356        {
357            tracing::warn!(
358                error = %e,
359                "DWB flush failed — torn-write protection lost for this batch, detaching DWB"
360            );
361            self.double_write = None;
362        }
363        self.flush_buffer()?;
364        self.file.sync_all()?;
365        Ok(())
366    }
367
368    /// Seal the WAL — no more writes will be accepted.
369    ///
370    /// Flushes any buffered data before sealing.
371    pub fn seal(&mut self) -> Result<()> {
372        self.sync()?;
373        self.sealed = true;
374        Ok(())
375    }
376
377    /// The next LSN that will be assigned.
378    pub fn next_lsn(&self) -> u64 {
379        self.next_lsn.load(Ordering::Relaxed)
380    }
381
382    /// Current file size (bytes written to disk).
383    pub fn file_offset(&self) -> u64 {
384        self.file_offset
385    }
386
387    /// Flush the aligned buffer to the file.
388    fn flush_buffer(&mut self) -> Result<()> {
389        if self.buffer.is_empty() {
390            return Ok(());
391        }
392
393        let data = if self.config.use_direct_io {
394            // O_DIRECT requires aligned I/O size.
395            self.buffer.as_aligned_slice()
396        } else {
397            // Without O_DIRECT, write only the actual data.
398            self.buffer.as_slice()
399        };
400
401        // Use pwrite to write at the exact offset, retrying on short writes.
402        #[cfg(unix)]
403        {
404            use std::os::unix::io::AsRawFd;
405            let fd = self.file.as_raw_fd();
406            let mut remaining = data;
407            let mut write_offset = self.file_offset;
408            while !remaining.is_empty() {
409                let written = unsafe {
410                    libc::pwrite(
411                        fd,
412                        remaining.as_ptr() as *const libc::c_void,
413                        remaining.len(),
414                        write_offset as libc::off_t,
415                    )
416                };
417                if written < 0 {
418                    return Err(WalError::Io(std::io::Error::last_os_error()));
419                }
420                let n = written as usize;
421                remaining = &remaining[n..];
422                write_offset += n as u64;
423            }
424        }
425
426        self.file_offset += data.len() as u64;
427        self.buffer.clear();
428        Ok(())
429    }
430}
431
432#[cfg(test)]
433mod tests {
434    use super::*;
435    use crate::record::RecordType;
436
437    #[test]
438    fn write_and_sync_single_record() {
439        let dir = tempfile::tempdir().unwrap();
440        let path = dir.path().join("test.wal");
441
442        let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
443        let lsn = writer
444            .append(RecordType::Put as u32, 1, 0, 0, b"hello")
445            .unwrap();
446        assert_eq!(lsn, 1);
447
448        writer.sync().unwrap();
449        assert!(writer.file_offset() > 0);
450    }
451
452    #[test]
453    fn lsn_increments() {
454        let dir = tempfile::tempdir().unwrap();
455        let path = dir.path().join("test.wal");
456
457        let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
458
459        let lsn1 = writer
460            .append(RecordType::Put as u32, 1, 0, 0, b"first")
461            .unwrap();
462        let lsn2 = writer
463            .append(RecordType::Put as u32, 1, 0, 0, b"second")
464            .unwrap();
465        let lsn3 = writer
466            .append(RecordType::Put as u32, 1, 0, 0, b"third")
467            .unwrap();
468
469        assert_eq!(lsn1, 1);
470        assert_eq!(lsn2, 2);
471        assert_eq!(lsn3, 3);
472    }
473
474    #[test]
475    fn sealed_writer_rejects_writes() {
476        let dir = tempfile::tempdir().unwrap();
477        let path = dir.path().join("test.wal");
478
479        let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
480        writer.seal().unwrap();
481
482        assert!(matches!(
483            writer.append(RecordType::Put as u32, 1, 0, 0, b"rejected"),
484            Err(WalError::Sealed)
485        ));
486    }
487}