1use std::fs::{File, OpenOptions};
24use std::os::unix::fs::OpenOptionsExt;
25use std::path::Path;
26use std::sync::atomic::{AtomicU64, Ordering};
27
28use crate::align::{AlignedBuf, DEFAULT_ALIGNMENT};
29use crate::double_write::DwbMode;
30use crate::error::{Result, WalError};
31use crate::preamble::SegmentPreamble;
32use crate::record::{HEADER_SIZE, WalRecord};
33
34pub const DEFAULT_WRITE_BUFFER_SIZE: usize = 2 * 1024 * 1024;
42
43#[derive(Debug, Clone)]
45pub struct WalWriterConfig {
46 pub write_buffer_size: usize,
48
49 pub alignment: usize,
51
52 pub use_direct_io: bool,
55
56 pub dwb_mode: Option<DwbMode>,
60}
61
62impl Default for WalWriterConfig {
63 fn default() -> Self {
64 Self {
65 write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
66 alignment: DEFAULT_ALIGNMENT,
67 use_direct_io: true,
68 dwb_mode: None,
69 }
70 }
71}
72
73fn resolve_dwb_mode(config: &WalWriterConfig) -> DwbMode {
74 config
75 .dwb_mode
76 .unwrap_or_else(|| DwbMode::default_for_parent(config.use_direct_io))
77}
78
79fn open_dwb_for(
80 config: &WalWriterConfig,
81 path: &Path,
82) -> Option<crate::double_write::DoubleWriteBuffer> {
83 let mode = resolve_dwb_mode(config);
84 if mode == DwbMode::Off {
85 return None;
86 }
87 let dwb_path = path.with_extension("dwb");
88 match crate::double_write::DoubleWriteBuffer::open(&dwb_path, mode) {
89 Ok(d) => Some(d),
90 Err(e) => {
91 tracing::warn!(
92 path = %dwb_path.display(),
93 error = %e,
94 mode = ?mode,
95 "failed to open DWB — torn-write protection disabled for this writer"
96 );
97 None
98 }
99 }
100}
101
102pub struct WalWriter {
104 file: File,
106
107 buffer: AlignedBuf,
109
110 file_offset: u64,
112
113 next_lsn: AtomicU64,
115
116 sealed: bool,
118
119 config: WalWriterConfig,
121
122 encryption_ring: Option<crate::crypto::KeyRing>,
124
125 segment_preamble: Option<SegmentPreamble>,
129
130 double_write: Option<crate::double_write::DoubleWriteBuffer>,
133}
134
135impl WalWriter {
136 pub fn open(path: &Path, config: WalWriterConfig) -> Result<Self> {
138 let mut opts = OpenOptions::new();
139 opts.create(true).write(true).append(false);
140
141 if config.use_direct_io {
142 opts.custom_flags(libc::O_DIRECT);
144 }
145
146 let file = opts.open(path)?;
147
148 let buffer = AlignedBuf::new(config.write_buffer_size, config.alignment)?;
149
150 let (file_offset, next_lsn) = if path.exists() && std::fs::metadata(path)?.len() > 0 {
152 let info = crate::recovery::recover(path)?;
153 (info.end_offset, info.next_lsn())
154 } else {
155 (0, 1)
156 };
157
158 let double_write = open_dwb_for(&config, path);
159
160 Ok(Self {
161 file,
162 buffer,
163 file_offset,
164 next_lsn: AtomicU64::new(next_lsn),
165 sealed: false,
166 config,
167 encryption_ring: None,
168 segment_preamble: None,
169 double_write,
170 })
171 }
172
173 pub fn set_encryption_key(&mut self, key: crate::crypto::WalEncryptionKey) -> Result<()> {
180 self.set_encryption_ring(crate::crypto::KeyRing::new(key))
181 }
182
183 pub fn set_encryption_ring(&mut self, ring: crate::crypto::KeyRing) -> Result<()> {
189 if self.file_offset != 0 || !self.buffer.is_empty() {
190 return Err(WalError::EncryptionError {
191 detail: "set_encryption_ring must be called before writing any records".into(),
192 });
193 }
194 let epoch = *ring.current().epoch();
195 let preamble = SegmentPreamble::new_wal(epoch);
196 let preamble_bytes = preamble.to_bytes();
197
198 self.buffer.write(&preamble_bytes);
201
202 self.encryption_ring = Some(ring);
203 self.segment_preamble = Some(preamble);
204 Ok(())
205 }
206
207 pub fn encryption_ring(&self) -> Option<&crate::crypto::KeyRing> {
209 self.encryption_ring.as_ref()
210 }
211
212 pub fn segment_preamble(&self) -> Option<&SegmentPreamble> {
214 self.segment_preamble.as_ref()
215 }
216
217 pub fn open_with_start_lsn(
223 path: &Path,
224 config: WalWriterConfig,
225 start_lsn: u64,
226 ) -> Result<Self> {
227 let mut opts = OpenOptions::new();
228 opts.create(true).write(true).append(false);
229
230 if config.use_direct_io {
231 opts.custom_flags(libc::O_DIRECT);
232 }
233
234 let file = opts.open(path)?;
235 let buffer = AlignedBuf::new(config.write_buffer_size, config.alignment)?;
236
237 let double_write = open_dwb_for(&config, path);
238
239 Ok(Self {
240 file,
241 buffer,
242 file_offset: 0,
243 next_lsn: AtomicU64::new(start_lsn),
244 sealed: false,
245 config,
246 encryption_ring: None,
247 segment_preamble: None,
248 double_write,
249 })
250 }
251
252 pub fn open_without_direct_io(path: &Path) -> Result<Self> {
254 Self::open(
255 path,
256 WalWriterConfig {
257 use_direct_io: false,
258 ..Default::default()
259 },
260 )
261 }
262
263 pub fn append(
271 &mut self,
272 record_type: u32,
273 tenant_id: u64,
274 vshard_id: u32,
275 database_id: u64,
276 payload: &[u8],
277 ) -> Result<u64> {
278 if self.sealed {
279 return Err(WalError::Sealed);
280 }
281
282 let lsn = self.next_lsn.fetch_add(1, Ordering::Relaxed);
283 let preamble_bytes = self.segment_preamble.as_ref().map(|p| p.to_bytes());
284 let record = WalRecord::new(
285 record_type,
286 lsn,
287 tenant_id,
288 vshard_id,
289 database_id,
290 payload.to_vec(),
291 self.encryption_ring.as_ref().map(|r| r.current()),
292 preamble_bytes.as_ref(),
293 )?;
294
295 if let Some(dwb) = &mut self.double_write
304 && let Err(e) = dwb.write_record_deferred(&record)
305 {
306 tracing::warn!(
307 lsn = lsn,
308 error = %e,
309 "DWB write failed — torn-write protection degraded, detaching DWB"
310 );
311 self.double_write = None;
312 }
313
314 let header_bytes = record.header.to_bytes();
315 let total_size = HEADER_SIZE + record.payload.len();
316
317 if self.buffer.remaining() < total_size {
319 self.flush_buffer()?;
320 }
321
322 if total_size > self.buffer.capacity() {
325 return Err(WalError::PayloadTooLarge {
326 size: record.payload.len(),
327 max: self.buffer.capacity() - HEADER_SIZE,
328 });
329 }
330
331 self.buffer.write(&header_bytes);
332 self.buffer.write(&record.payload);
333
334 Ok(lsn)
335 }
336
337 pub fn sync(&mut self) -> Result<()> {
343 if self.buffer.is_empty() {
344 return Ok(());
345 }
346 if let Some(dwb) = &mut self.double_write
351 && let Err(e) = dwb.flush()
352 {
353 tracing::warn!(
354 error = %e,
355 "DWB flush failed — torn-write protection lost for this batch, detaching DWB"
356 );
357 self.double_write = None;
358 }
359 self.flush_buffer()?;
360 self.file.sync_all()?;
361 Ok(())
362 }
363
364 pub fn seal(&mut self) -> Result<()> {
368 self.sync()?;
369 self.sealed = true;
370 Ok(())
371 }
372
373 pub fn next_lsn(&self) -> u64 {
375 self.next_lsn.load(Ordering::Relaxed)
376 }
377
378 pub fn file_offset(&self) -> u64 {
380 self.file_offset
381 }
382
383 fn flush_buffer(&mut self) -> Result<()> {
385 if self.buffer.is_empty() {
386 return Ok(());
387 }
388
389 let data = if self.config.use_direct_io {
390 self.buffer.as_aligned_slice()
392 } else {
393 self.buffer.as_slice()
395 };
396
397 #[cfg(unix)]
399 {
400 use std::os::unix::io::AsRawFd;
401 let fd = self.file.as_raw_fd();
402 let mut remaining = data;
403 let mut write_offset = self.file_offset;
404 while !remaining.is_empty() {
405 let written = unsafe {
406 libc::pwrite(
407 fd,
408 remaining.as_ptr() as *const libc::c_void,
409 remaining.len(),
410 write_offset as libc::off_t,
411 )
412 };
413 if written < 0 {
414 return Err(WalError::Io(std::io::Error::last_os_error()));
415 }
416 let n = written as usize;
417 remaining = &remaining[n..];
418 write_offset += n as u64;
419 }
420 }
421
422 self.file_offset += data.len() as u64;
423 self.buffer.clear();
424 Ok(())
425 }
426}
427
428#[cfg(test)]
429mod tests {
430 use super::*;
431 use crate::record::RecordType;
432
433 #[test]
434 fn write_and_sync_single_record() {
435 let dir = tempfile::tempdir().unwrap();
436 let path = dir.path().join("test.wal");
437
438 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
439 let lsn = writer
440 .append(RecordType::Put as u32, 1, 0, 0, b"hello")
441 .unwrap();
442 assert_eq!(lsn, 1);
443
444 writer.sync().unwrap();
445 assert!(writer.file_offset() > 0);
446 }
447
448 #[test]
449 fn lsn_increments() {
450 let dir = tempfile::tempdir().unwrap();
451 let path = dir.path().join("test.wal");
452
453 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
454
455 let lsn1 = writer
456 .append(RecordType::Put as u32, 1, 0, 0, b"first")
457 .unwrap();
458 let lsn2 = writer
459 .append(RecordType::Put as u32, 1, 0, 0, b"second")
460 .unwrap();
461 let lsn3 = writer
462 .append(RecordType::Put as u32, 1, 0, 0, b"third")
463 .unwrap();
464
465 assert_eq!(lsn1, 1);
466 assert_eq!(lsn2, 2);
467 assert_eq!(lsn3, 3);
468 }
469
470 #[test]
471 fn sealed_writer_rejects_writes() {
472 let dir = tempfile::tempdir().unwrap();
473 let path = dir.path().join("test.wal");
474
475 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
476 writer.seal().unwrap();
477
478 assert!(matches!(
479 writer.append(RecordType::Put as u32, 1, 0, 0, b"rejected"),
480 Err(WalError::Sealed)
481 ));
482 }
483}