Skip to main content

nodedb_wal/
writer.rs

1//! WAL writer with O_DIRECT and group commit.
2//!
3//! The writer accumulates records into an aligned buffer and flushes to disk
4//! when the buffer is full or when an explicit sync is requested.
5//!
6//! ## I/O path
7//!
8//! 1. Caller creates a `WalRecord` and submits it to the writer.
9//! 2. Writer serializes the record into the aligned write buffer.
10//! 3. When the buffer is full or `sync()` is called, the buffer is written
11//!    to the WAL file via `O_DIRECT` + `fsync`.
12//! 4. Group commit: multiple concurrent writers can submit records between
13//!    syncs, and they all share a single `fsync` call.
14//!
15//! ## Future: io_uring
16//!
17//! The current implementation uses standard `pwrite` + `fsync` with O_DIRECT.
18//! Phase 1 validates correctness and determinism. io_uring submission will be
19//! added once the bridge crate provides the TPC event loop integration.
20
21use std::fs::{File, OpenOptions};
22use std::os::unix::fs::OpenOptionsExt;
23use std::path::Path;
24use std::sync::atomic::{AtomicU64, Ordering};
25
26use crate::align::{AlignedBuf, DEFAULT_ALIGNMENT};
27use crate::error::{Result, WalError};
28use crate::record::{HEADER_SIZE, WalRecord};
29
30/// Default write buffer size: 2 MiB.
31///
32/// This is the batch size for group commit. Records accumulate here until
33/// the buffer is full or `sync()` is called.
34///
35/// Matches `WalTuning::write_buffer_size` default. Override via
36/// `WalWriterConfig::write_buffer_size` at construction time.
37pub const DEFAULT_WRITE_BUFFER_SIZE: usize = 2 * 1024 * 1024;
38
39/// Configuration for the WAL writer.
40#[derive(Debug, Clone)]
41pub struct WalWriterConfig {
42    /// Size of the aligned write buffer (rounded up to alignment).
43    pub write_buffer_size: usize,
44
45    /// O_DIRECT alignment (typically 4096 for NVMe).
46    pub alignment: usize,
47
48    /// Whether to use O_DIRECT. Set to `false` for testing on filesystems
49    /// that don't support it (e.g., tmpfs).
50    pub use_direct_io: bool,
51}
52
53impl Default for WalWriterConfig {
54    fn default() -> Self {
55        Self {
56            write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
57            alignment: DEFAULT_ALIGNMENT,
58            use_direct_io: true,
59        }
60    }
61}
62
63/// Append-only WAL writer.
64pub struct WalWriter {
65    /// The WAL file handle (opened with O_DIRECT if configured).
66    file: File,
67
68    /// Aligned write buffer for batching records before flush.
69    buffer: AlignedBuf,
70
71    /// Current file write offset (always aligned).
72    file_offset: u64,
73
74    /// Next LSN to assign.
75    next_lsn: AtomicU64,
76
77    /// Whether the writer has been sealed (no more writes accepted).
78    sealed: bool,
79
80    /// Configuration.
81    config: WalWriterConfig,
82
83    /// Optional key ring for payload encryption (supports key rotation).
84    encryption_ring: Option<crate::crypto::KeyRing>,
85
86    /// Optional double-write buffer for torn write protection.
87    /// Records are written here before the WAL for crash recovery.
88    double_write: Option<crate::double_write::DoubleWriteBuffer>,
89}
90
91impl WalWriter {
92    /// Open or create a WAL file at the given path.
93    pub fn open(path: &Path, config: WalWriterConfig) -> Result<Self> {
94        let mut opts = OpenOptions::new();
95        opts.create(true).write(true).append(false);
96
97        if config.use_direct_io {
98            // O_DIRECT: bypass page cache.
99            opts.custom_flags(libc::O_DIRECT);
100        }
101
102        let file = opts.open(path)?;
103
104        let buffer = AlignedBuf::new(config.write_buffer_size, config.alignment)?;
105
106        // Scan existing WAL for recovery if the file has data.
107        let (file_offset, next_lsn) = if path.exists() && std::fs::metadata(path)?.len() > 0 {
108            let info = crate::recovery::recover(path)?;
109            (info.end_offset, info.next_lsn())
110        } else {
111            (0, 1)
112        };
113
114        // Open double-write buffer alongside the WAL file.
115        let dwb_path = path.with_extension("dwb");
116        let double_write = crate::double_write::DoubleWriteBuffer::open(&dwb_path).ok();
117
118        Ok(Self {
119            file,
120            buffer,
121            file_offset,
122            next_lsn: AtomicU64::new(next_lsn),
123            sealed: false,
124            config,
125            encryption_ring: None,
126            double_write,
127        })
128    }
129
130    /// Set the encryption key. When set, all subsequent records will have
131    /// their payloads encrypted with AES-256-GCM.
132    pub fn set_encryption_key(&mut self, key: crate::crypto::WalEncryptionKey) {
133        self.encryption_ring = Some(crate::crypto::KeyRing::new(key));
134    }
135
136    /// Set the key ring directly (for key rotation with dual-key reads).
137    pub fn set_encryption_ring(&mut self, ring: crate::crypto::KeyRing) {
138        self.encryption_ring = Some(ring);
139    }
140
141    /// Access the key ring (for decryption during replay).
142    pub fn encryption_ring(&self) -> Option<&crate::crypto::KeyRing> {
143        self.encryption_ring.as_ref()
144    }
145
146    /// Open a new WAL segment file with a specific starting LSN.
147    ///
148    /// Used by `SegmentedWal` when rolling to a new segment. The file must
149    /// not already exist (or be empty). The writer will assign LSNs starting
150    /// from `start_lsn`.
151    pub fn open_with_start_lsn(
152        path: &Path,
153        config: WalWriterConfig,
154        start_lsn: u64,
155    ) -> Result<Self> {
156        let mut opts = OpenOptions::new();
157        opts.create(true).write(true).append(false);
158
159        if config.use_direct_io {
160            opts.custom_flags(libc::O_DIRECT);
161        }
162
163        let file = opts.open(path)?;
164        let buffer = AlignedBuf::new(config.write_buffer_size, config.alignment)?;
165
166        let dwb_path = path.with_extension("dwb");
167        let double_write = crate::double_write::DoubleWriteBuffer::open(&dwb_path).ok();
168
169        Ok(Self {
170            file,
171            buffer,
172            file_offset: 0,
173            next_lsn: AtomicU64::new(start_lsn),
174            sealed: false,
175            config,
176            encryption_ring: None,
177            double_write,
178        })
179    }
180
181    /// Open a WAL writer with O_DIRECT disabled (for testing on tmpfs, etc.).
182    pub fn open_without_direct_io(path: &Path) -> Result<Self> {
183        Self::open(
184            path,
185            WalWriterConfig {
186                use_direct_io: false,
187                ..Default::default()
188            },
189        )
190    }
191
192    /// Append a record to the WAL. Returns the assigned LSN.
193    ///
194    /// The record is written to the in-memory buffer. Call `sync()` to
195    /// flush to disk and make the write durable.
196    pub fn append(
197        &mut self,
198        record_type: u16,
199        tenant_id: u32,
200        vshard_id: u16,
201        payload: &[u8],
202    ) -> Result<u64> {
203        if self.sealed {
204            return Err(WalError::Sealed);
205        }
206
207        let lsn = self.next_lsn.fetch_add(1, Ordering::Relaxed);
208        let record = WalRecord::new(
209            record_type,
210            lsn,
211            tenant_id,
212            vshard_id,
213            payload.to_vec(),
214            self.encryption_ring.as_ref().map(|r| r.current()),
215        )?;
216
217        // Write to double-write buffer (deferred — no fsync yet).
218        // The DWB is fsynced in batch during `sync()`, before the WAL fsync.
219        // This amortizes DWB fsync cost across the entire group commit batch.
220        //
221        // DWB failure is non-fatal for the write itself (the WAL is the
222        // authoritative store), but we log a warning because it means
223        // torn-write recovery is degraded. If the DWB is persistently
224        // broken, we detach it to avoid repeated error noise.
225        if let Some(dwb) = &mut self.double_write
226            && let Err(e) = dwb.write_record_deferred(&record)
227        {
228            tracing::warn!(
229                lsn = lsn,
230                error = %e,
231                "DWB write failed — torn-write protection degraded, detaching DWB"
232            );
233            self.double_write = None;
234        }
235
236        let header_bytes = record.header.to_bytes();
237        let total_size = HEADER_SIZE + record.payload.len();
238
239        // If this record doesn't fit in the remaining buffer, flush first.
240        if self.buffer.remaining() < total_size {
241            self.flush_buffer()?;
242        }
243
244        // If the record is larger than the entire buffer, we have a problem.
245        // This shouldn't happen with MAX_WAL_PAYLOAD_SIZE checks, but guard anyway.
246        if total_size > self.buffer.capacity() {
247            return Err(WalError::PayloadTooLarge {
248                size: record.payload.len(),
249                max: self.buffer.capacity() - HEADER_SIZE,
250            });
251        }
252
253        self.buffer.write(&header_bytes);
254        self.buffer.write(&record.payload);
255
256        Ok(lsn)
257    }
258
259    /// Flush the write buffer to disk (group commit).
260    ///
261    /// This issues a single write + fsync for all records accumulated
262    /// since the last flush. The DWB is also fsynced (one fsync for all
263    /// deferred DWB writes in this batch).
264    pub fn sync(&mut self) -> Result<()> {
265        if self.buffer.is_empty() {
266            return Ok(());
267        }
268        // Flush DWB first — records must be durable in DWB before WAL.
269        // If DWB flush fails, torn-write protection is lost for this batch.
270        // We log a warning and detach the DWB rather than silently continuing
271        // as if torn-write protection is active.
272        if let Some(dwb) = &mut self.double_write
273            && let Err(e) = dwb.flush()
274        {
275            tracing::warn!(
276                error = %e,
277                "DWB flush failed — torn-write protection lost for this batch, detaching DWB"
278            );
279            self.double_write = None;
280        }
281        self.flush_buffer()?;
282        self.file.sync_all()?;
283        Ok(())
284    }
285
286    /// Seal the WAL — no more writes will be accepted.
287    ///
288    /// Flushes any buffered data before sealing.
289    pub fn seal(&mut self) -> Result<()> {
290        self.sync()?;
291        self.sealed = true;
292        Ok(())
293    }
294
295    /// The next LSN that will be assigned.
296    pub fn next_lsn(&self) -> u64 {
297        self.next_lsn.load(Ordering::Relaxed)
298    }
299
300    /// Current file size (bytes written to disk).
301    pub fn file_offset(&self) -> u64 {
302        self.file_offset
303    }
304
305    /// Flush the aligned buffer to the file.
306    fn flush_buffer(&mut self) -> Result<()> {
307        if self.buffer.is_empty() {
308            return Ok(());
309        }
310
311        let data = if self.config.use_direct_io {
312            // O_DIRECT requires aligned I/O size.
313            self.buffer.as_aligned_slice()
314        } else {
315            // Without O_DIRECT, write only the actual data.
316            self.buffer.as_slice()
317        };
318
319        // Use pwrite to write at the exact offset.
320        #[cfg(unix)]
321        {
322            use std::os::unix::io::AsRawFd;
323            let fd = self.file.as_raw_fd();
324            let written = unsafe {
325                libc::pwrite(
326                    fd,
327                    data.as_ptr() as *const libc::c_void,
328                    data.len(),
329                    self.file_offset as libc::off_t,
330                )
331            };
332            if written < 0 {
333                return Err(WalError::Io(std::io::Error::last_os_error()));
334            }
335        }
336
337        self.file_offset += self.buffer.len() as u64;
338        self.buffer.clear();
339        Ok(())
340    }
341}
342
343#[cfg(test)]
344mod tests {
345    use super::*;
346    use crate::record::RecordType;
347
348    #[test]
349    fn write_and_sync_single_record() {
350        let dir = tempfile::tempdir().unwrap();
351        let path = dir.path().join("test.wal");
352
353        let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
354        let lsn = writer
355            .append(RecordType::Put as u16, 1, 0, b"hello")
356            .unwrap();
357        assert_eq!(lsn, 1);
358
359        writer.sync().unwrap();
360        assert!(writer.file_offset() > 0);
361    }
362
363    #[test]
364    fn lsn_increments() {
365        let dir = tempfile::tempdir().unwrap();
366        let path = dir.path().join("test.wal");
367
368        let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
369
370        let lsn1 = writer
371            .append(RecordType::Put as u16, 1, 0, b"first")
372            .unwrap();
373        let lsn2 = writer
374            .append(RecordType::Put as u16, 1, 0, b"second")
375            .unwrap();
376        let lsn3 = writer
377            .append(RecordType::Put as u16, 1, 0, b"third")
378            .unwrap();
379
380        assert_eq!(lsn1, 1);
381        assert_eq!(lsn2, 2);
382        assert_eq!(lsn3, 3);
383    }
384
385    #[test]
386    fn sealed_writer_rejects_writes() {
387        let dir = tempfile::tempdir().unwrap();
388        let path = dir.path().join("test.wal");
389
390        let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
391        writer.seal().unwrap();
392
393        assert!(matches!(
394            writer.append(RecordType::Put as u16, 1, 0, b"rejected"),
395            Err(WalError::Sealed)
396        ));
397    }
398}