1use std::fs::{File, OpenOptions};
22use std::os::unix::fs::OpenOptionsExt;
23use std::path::Path;
24use std::sync::atomic::{AtomicU64, Ordering};
25
26use crate::align::{AlignedBuf, DEFAULT_ALIGNMENT};
27use crate::double_write::DwbMode;
28use crate::error::{Result, WalError};
29use crate::record::{HEADER_SIZE, WalRecord};
30
31pub const DEFAULT_WRITE_BUFFER_SIZE: usize = 2 * 1024 * 1024;
39
40#[derive(Debug, Clone)]
42pub struct WalWriterConfig {
43 pub write_buffer_size: usize,
45
46 pub alignment: usize,
48
49 pub use_direct_io: bool,
52
53 pub dwb_mode: Option<DwbMode>,
57}
58
59impl Default for WalWriterConfig {
60 fn default() -> Self {
61 Self {
62 write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
63 alignment: DEFAULT_ALIGNMENT,
64 use_direct_io: true,
65 dwb_mode: None,
66 }
67 }
68}
69
70fn resolve_dwb_mode(config: &WalWriterConfig) -> DwbMode {
71 config
72 .dwb_mode
73 .unwrap_or_else(|| DwbMode::default_for_parent(config.use_direct_io))
74}
75
76fn open_dwb_for(
77 config: &WalWriterConfig,
78 path: &Path,
79) -> Option<crate::double_write::DoubleWriteBuffer> {
80 let mode = resolve_dwb_mode(config);
81 if mode == DwbMode::Off {
82 return None;
83 }
84 let dwb_path = path.with_extension("dwb");
85 match crate::double_write::DoubleWriteBuffer::open(&dwb_path, mode) {
86 Ok(d) => Some(d),
87 Err(e) => {
88 tracing::warn!(
89 path = %dwb_path.display(),
90 error = %e,
91 mode = ?mode,
92 "failed to open DWB — torn-write protection disabled for this writer"
93 );
94 None
95 }
96 }
97}
98
99pub struct WalWriter {
101 file: File,
103
104 buffer: AlignedBuf,
106
107 file_offset: u64,
109
110 next_lsn: AtomicU64,
112
113 sealed: bool,
115
116 config: WalWriterConfig,
118
119 encryption_ring: Option<crate::crypto::KeyRing>,
121
122 double_write: Option<crate::double_write::DoubleWriteBuffer>,
125}
126
127impl WalWriter {
128 pub fn open(path: &Path, config: WalWriterConfig) -> Result<Self> {
130 let mut opts = OpenOptions::new();
131 opts.create(true).write(true).append(false);
132
133 if config.use_direct_io {
134 opts.custom_flags(libc::O_DIRECT);
136 }
137
138 let file = opts.open(path)?;
139
140 let buffer = AlignedBuf::new(config.write_buffer_size, config.alignment)?;
141
142 let (file_offset, next_lsn) = if path.exists() && std::fs::metadata(path)?.len() > 0 {
144 let info = crate::recovery::recover(path)?;
145 (info.end_offset, info.next_lsn())
146 } else {
147 (0, 1)
148 };
149
150 let double_write = open_dwb_for(&config, path);
151
152 Ok(Self {
153 file,
154 buffer,
155 file_offset,
156 next_lsn: AtomicU64::new(next_lsn),
157 sealed: false,
158 config,
159 encryption_ring: None,
160 double_write,
161 })
162 }
163
164 pub fn set_encryption_key(&mut self, key: crate::crypto::WalEncryptionKey) {
167 self.encryption_ring = Some(crate::crypto::KeyRing::new(key));
168 }
169
170 pub fn set_encryption_ring(&mut self, ring: crate::crypto::KeyRing) {
172 self.encryption_ring = Some(ring);
173 }
174
175 pub fn encryption_ring(&self) -> Option<&crate::crypto::KeyRing> {
177 self.encryption_ring.as_ref()
178 }
179
180 pub fn open_with_start_lsn(
186 path: &Path,
187 config: WalWriterConfig,
188 start_lsn: u64,
189 ) -> Result<Self> {
190 let mut opts = OpenOptions::new();
191 opts.create(true).write(true).append(false);
192
193 if config.use_direct_io {
194 opts.custom_flags(libc::O_DIRECT);
195 }
196
197 let file = opts.open(path)?;
198 let buffer = AlignedBuf::new(config.write_buffer_size, config.alignment)?;
199
200 let double_write = open_dwb_for(&config, path);
201
202 Ok(Self {
203 file,
204 buffer,
205 file_offset: 0,
206 next_lsn: AtomicU64::new(start_lsn),
207 sealed: false,
208 config,
209 encryption_ring: None,
210 double_write,
211 })
212 }
213
214 pub fn open_without_direct_io(path: &Path) -> Result<Self> {
216 Self::open(
217 path,
218 WalWriterConfig {
219 use_direct_io: false,
220 ..Default::default()
221 },
222 )
223 }
224
225 pub fn append(
230 &mut self,
231 record_type: u16,
232 tenant_id: u32,
233 vshard_id: u16,
234 payload: &[u8],
235 ) -> Result<u64> {
236 if self.sealed {
237 return Err(WalError::Sealed);
238 }
239
240 let lsn = self.next_lsn.fetch_add(1, Ordering::Relaxed);
241 let record = WalRecord::new(
242 record_type,
243 lsn,
244 tenant_id,
245 vshard_id,
246 payload.to_vec(),
247 self.encryption_ring.as_ref().map(|r| r.current()),
248 )?;
249
250 if let Some(dwb) = &mut self.double_write
259 && let Err(e) = dwb.write_record_deferred(&record)
260 {
261 tracing::warn!(
262 lsn = lsn,
263 error = %e,
264 "DWB write failed — torn-write protection degraded, detaching DWB"
265 );
266 self.double_write = None;
267 }
268
269 let header_bytes = record.header.to_bytes();
270 let total_size = HEADER_SIZE + record.payload.len();
271
272 if self.buffer.remaining() < total_size {
274 self.flush_buffer()?;
275 }
276
277 if total_size > self.buffer.capacity() {
280 return Err(WalError::PayloadTooLarge {
281 size: record.payload.len(),
282 max: self.buffer.capacity() - HEADER_SIZE,
283 });
284 }
285
286 self.buffer.write(&header_bytes);
287 self.buffer.write(&record.payload);
288
289 Ok(lsn)
290 }
291
292 pub fn sync(&mut self) -> Result<()> {
298 if self.buffer.is_empty() {
299 return Ok(());
300 }
301 if let Some(dwb) = &mut self.double_write
306 && let Err(e) = dwb.flush()
307 {
308 tracing::warn!(
309 error = %e,
310 "DWB flush failed — torn-write protection lost for this batch, detaching DWB"
311 );
312 self.double_write = None;
313 }
314 self.flush_buffer()?;
315 self.file.sync_all()?;
316 Ok(())
317 }
318
319 pub fn seal(&mut self) -> Result<()> {
323 self.sync()?;
324 self.sealed = true;
325 Ok(())
326 }
327
328 pub fn next_lsn(&self) -> u64 {
330 self.next_lsn.load(Ordering::Relaxed)
331 }
332
333 pub fn file_offset(&self) -> u64 {
335 self.file_offset
336 }
337
338 fn flush_buffer(&mut self) -> Result<()> {
340 if self.buffer.is_empty() {
341 return Ok(());
342 }
343
344 let data = if self.config.use_direct_io {
345 self.buffer.as_aligned_slice()
347 } else {
348 self.buffer.as_slice()
350 };
351
352 #[cfg(unix)]
354 {
355 use std::os::unix::io::AsRawFd;
356 let fd = self.file.as_raw_fd();
357 let mut remaining = data;
358 let mut write_offset = self.file_offset;
359 while !remaining.is_empty() {
360 let written = unsafe {
361 libc::pwrite(
362 fd,
363 remaining.as_ptr() as *const libc::c_void,
364 remaining.len(),
365 write_offset as libc::off_t,
366 )
367 };
368 if written < 0 {
369 return Err(WalError::Io(std::io::Error::last_os_error()));
370 }
371 let n = written as usize;
372 remaining = &remaining[n..];
373 write_offset += n as u64;
374 }
375 }
376
377 self.file_offset += data.len() as u64;
378 self.buffer.clear();
379 Ok(())
380 }
381}
382
383#[cfg(test)]
384mod tests {
385 use super::*;
386 use crate::record::RecordType;
387
388 #[test]
389 fn write_and_sync_single_record() {
390 let dir = tempfile::tempdir().unwrap();
391 let path = dir.path().join("test.wal");
392
393 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
394 let lsn = writer
395 .append(RecordType::Put as u16, 1, 0, b"hello")
396 .unwrap();
397 assert_eq!(lsn, 1);
398
399 writer.sync().unwrap();
400 assert!(writer.file_offset() > 0);
401 }
402
403 #[test]
404 fn lsn_increments() {
405 let dir = tempfile::tempdir().unwrap();
406 let path = dir.path().join("test.wal");
407
408 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
409
410 let lsn1 = writer
411 .append(RecordType::Put as u16, 1, 0, b"first")
412 .unwrap();
413 let lsn2 = writer
414 .append(RecordType::Put as u16, 1, 0, b"second")
415 .unwrap();
416 let lsn3 = writer
417 .append(RecordType::Put as u16, 1, 0, b"third")
418 .unwrap();
419
420 assert_eq!(lsn1, 1);
421 assert_eq!(lsn2, 2);
422 assert_eq!(lsn3, 3);
423 }
424
425 #[test]
426 fn sealed_writer_rejects_writes() {
427 let dir = tempfile::tempdir().unwrap();
428 let path = dir.path().join("test.wal");
429
430 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
431 writer.seal().unwrap();
432
433 assert!(matches!(
434 writer.append(RecordType::Put as u16, 1, 0, b"rejected"),
435 Err(WalError::Sealed)
436 ));
437 }
438}