1use std::fs::{File, OpenOptions};
24use std::os::unix::fs::OpenOptionsExt;
25use std::path::Path;
26use std::sync::atomic::{AtomicU64, Ordering};
27
28use crate::align::{AlignedBuf, DEFAULT_ALIGNMENT};
29use crate::double_write::DwbMode;
30use crate::error::{Result, WalError};
31use crate::preamble::SegmentPreamble;
32use crate::record::{HEADER_SIZE, WalRecord};
33
34pub const DEFAULT_WRITE_BUFFER_SIZE: usize = 2 * 1024 * 1024;
42
43#[derive(Debug, Clone)]
45pub struct WalWriterConfig {
46 pub write_buffer_size: usize,
48
49 pub alignment: usize,
51
52 pub use_direct_io: bool,
55
56 pub dwb_mode: Option<DwbMode>,
60}
61
62impl Default for WalWriterConfig {
63 fn default() -> Self {
64 Self {
65 write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
66 alignment: DEFAULT_ALIGNMENT,
67 use_direct_io: true,
68 dwb_mode: None,
69 }
70 }
71}
72
73fn resolve_dwb_mode(config: &WalWriterConfig) -> DwbMode {
74 config
75 .dwb_mode
76 .unwrap_or_else(|| DwbMode::default_for_parent(config.use_direct_io))
77}
78
79fn open_dwb_for(
80 config: &WalWriterConfig,
81 path: &Path,
82) -> Option<crate::double_write::DoubleWriteBuffer> {
83 let mode = resolve_dwb_mode(config);
84 if mode == DwbMode::Off {
85 return None;
86 }
87 let dwb_path = path.with_extension("dwb");
88 match crate::double_write::DoubleWriteBuffer::open(&dwb_path, mode) {
89 Ok(d) => Some(d),
90 Err(e) => {
91 tracing::warn!(
92 path = %dwb_path.display(),
93 error = %e,
94 mode = ?mode,
95 "failed to open DWB — torn-write protection disabled for this writer"
96 );
97 None
98 }
99 }
100}
101
102pub struct WalWriter {
104 file: File,
106
107 buffer: AlignedBuf,
109
110 file_offset: u64,
112
113 next_lsn: AtomicU64,
115
116 sealed: bool,
118
119 config: WalWriterConfig,
121
122 encryption_ring: Option<crate::crypto::KeyRing>,
124
125 segment_preamble: Option<SegmentPreamble>,
129
130 double_write: Option<crate::double_write::DoubleWriteBuffer>,
133}
134
135impl WalWriter {
136 pub fn open(path: &Path, config: WalWriterConfig) -> Result<Self> {
138 let mut opts = OpenOptions::new();
139 opts.create(true).write(true).append(false);
140
141 if config.use_direct_io {
142 opts.custom_flags(libc::O_DIRECT);
144 }
145
146 let file = opts.open(path)?;
147
148 let buffer = AlignedBuf::new(config.write_buffer_size, config.alignment)?;
149
150 let (file_offset, next_lsn) = if path.exists() && std::fs::metadata(path)?.len() > 0 {
152 let info = crate::recovery::recover(path)?;
153 (info.end_offset, info.next_lsn())
154 } else {
155 (0, 1)
156 };
157
158 let double_write = open_dwb_for(&config, path);
159
160 Ok(Self {
161 file,
162 buffer,
163 file_offset,
164 next_lsn: AtomicU64::new(next_lsn),
165 sealed: false,
166 config,
167 encryption_ring: None,
168 segment_preamble: None,
169 double_write,
170 })
171 }
172
173 pub fn set_encryption_key(&mut self, key: crate::crypto::WalEncryptionKey) -> Result<()> {
180 self.set_encryption_ring(crate::crypto::KeyRing::new(key))
181 }
182
183 pub fn set_encryption_ring(&mut self, ring: crate::crypto::KeyRing) -> Result<()> {
189 if self.file_offset != 0 || !self.buffer.is_empty() {
190 return Err(WalError::EncryptionError {
191 detail: "set_encryption_ring must be called before writing any records".into(),
192 });
193 }
194 let epoch = *ring.current().epoch();
195 let preamble = SegmentPreamble::new_wal(epoch);
196 let preamble_bytes = preamble.to_bytes();
197
198 self.buffer.write(&preamble_bytes);
201
202 self.encryption_ring = Some(ring);
203 self.segment_preamble = Some(preamble);
204 Ok(())
205 }
206
207 pub fn encryption_ring(&self) -> Option<&crate::crypto::KeyRing> {
209 self.encryption_ring.as_ref()
210 }
211
212 pub fn segment_preamble(&self) -> Option<&SegmentPreamble> {
214 self.segment_preamble.as_ref()
215 }
216
217 pub fn open_with_start_lsn(
223 path: &Path,
224 config: WalWriterConfig,
225 start_lsn: u64,
226 ) -> Result<Self> {
227 let mut opts = OpenOptions::new();
228 opts.create(true).write(true).append(false);
229
230 if config.use_direct_io {
231 opts.custom_flags(libc::O_DIRECT);
232 }
233
234 let file = opts.open(path)?;
235 let buffer = AlignedBuf::new(config.write_buffer_size, config.alignment)?;
236
237 let double_write = open_dwb_for(&config, path);
238
239 Ok(Self {
240 file,
241 buffer,
242 file_offset: 0,
243 next_lsn: AtomicU64::new(start_lsn),
244 sealed: false,
245 config,
246 encryption_ring: None,
247 segment_preamble: None,
248 double_write,
249 })
250 }
251
252 pub fn open_without_direct_io(path: &Path) -> Result<Self> {
254 Self::open(
255 path,
256 WalWriterConfig {
257 use_direct_io: false,
258 ..Default::default()
259 },
260 )
261 }
262
263 pub fn append(
268 &mut self,
269 record_type: u32,
270 tenant_id: u64,
271 vshard_id: u32,
272 payload: &[u8],
273 ) -> Result<u64> {
274 if self.sealed {
275 return Err(WalError::Sealed);
276 }
277
278 let lsn = self.next_lsn.fetch_add(1, Ordering::Relaxed);
279 let preamble_bytes = self.segment_preamble.as_ref().map(|p| p.to_bytes());
280 let record = WalRecord::new(
281 record_type,
282 lsn,
283 tenant_id,
284 vshard_id,
285 payload.to_vec(),
286 self.encryption_ring.as_ref().map(|r| r.current()),
287 preamble_bytes.as_ref(),
288 )?;
289
290 if let Some(dwb) = &mut self.double_write
299 && let Err(e) = dwb.write_record_deferred(&record)
300 {
301 tracing::warn!(
302 lsn = lsn,
303 error = %e,
304 "DWB write failed — torn-write protection degraded, detaching DWB"
305 );
306 self.double_write = None;
307 }
308
309 let header_bytes = record.header.to_bytes();
310 let total_size = HEADER_SIZE + record.payload.len();
311
312 if self.buffer.remaining() < total_size {
314 self.flush_buffer()?;
315 }
316
317 if total_size > self.buffer.capacity() {
320 return Err(WalError::PayloadTooLarge {
321 size: record.payload.len(),
322 max: self.buffer.capacity() - HEADER_SIZE,
323 });
324 }
325
326 self.buffer.write(&header_bytes);
327 self.buffer.write(&record.payload);
328
329 Ok(lsn)
330 }
331
332 pub fn sync(&mut self) -> Result<()> {
338 if self.buffer.is_empty() {
339 return Ok(());
340 }
341 if let Some(dwb) = &mut self.double_write
346 && let Err(e) = dwb.flush()
347 {
348 tracing::warn!(
349 error = %e,
350 "DWB flush failed — torn-write protection lost for this batch, detaching DWB"
351 );
352 self.double_write = None;
353 }
354 self.flush_buffer()?;
355 self.file.sync_all()?;
356 Ok(())
357 }
358
359 pub fn seal(&mut self) -> Result<()> {
363 self.sync()?;
364 self.sealed = true;
365 Ok(())
366 }
367
368 pub fn next_lsn(&self) -> u64 {
370 self.next_lsn.load(Ordering::Relaxed)
371 }
372
373 pub fn file_offset(&self) -> u64 {
375 self.file_offset
376 }
377
378 fn flush_buffer(&mut self) -> Result<()> {
380 if self.buffer.is_empty() {
381 return Ok(());
382 }
383
384 let data = if self.config.use_direct_io {
385 self.buffer.as_aligned_slice()
387 } else {
388 self.buffer.as_slice()
390 };
391
392 #[cfg(unix)]
394 {
395 use std::os::unix::io::AsRawFd;
396 let fd = self.file.as_raw_fd();
397 let mut remaining = data;
398 let mut write_offset = self.file_offset;
399 while !remaining.is_empty() {
400 let written = unsafe {
401 libc::pwrite(
402 fd,
403 remaining.as_ptr() as *const libc::c_void,
404 remaining.len(),
405 write_offset as libc::off_t,
406 )
407 };
408 if written < 0 {
409 return Err(WalError::Io(std::io::Error::last_os_error()));
410 }
411 let n = written as usize;
412 remaining = &remaining[n..];
413 write_offset += n as u64;
414 }
415 }
416
417 self.file_offset += data.len() as u64;
418 self.buffer.clear();
419 Ok(())
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 use super::*;
426 use crate::record::RecordType;
427
428 #[test]
429 fn write_and_sync_single_record() {
430 let dir = tempfile::tempdir().unwrap();
431 let path = dir.path().join("test.wal");
432
433 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
434 let lsn = writer
435 .append(RecordType::Put as u32, 1, 0, b"hello")
436 .unwrap();
437 assert_eq!(lsn, 1);
438
439 writer.sync().unwrap();
440 assert!(writer.file_offset() > 0);
441 }
442
443 #[test]
444 fn lsn_increments() {
445 let dir = tempfile::tempdir().unwrap();
446 let path = dir.path().join("test.wal");
447
448 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
449
450 let lsn1 = writer
451 .append(RecordType::Put as u32, 1, 0, b"first")
452 .unwrap();
453 let lsn2 = writer
454 .append(RecordType::Put as u32, 1, 0, b"second")
455 .unwrap();
456 let lsn3 = writer
457 .append(RecordType::Put as u32, 1, 0, b"third")
458 .unwrap();
459
460 assert_eq!(lsn1, 1);
461 assert_eq!(lsn2, 2);
462 assert_eq!(lsn3, 3);
463 }
464
465 #[test]
466 fn sealed_writer_rejects_writes() {
467 let dir = tempfile::tempdir().unwrap();
468 let path = dir.path().join("test.wal");
469
470 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
471 writer.seal().unwrap();
472
473 assert!(matches!(
474 writer.append(RecordType::Put as u32, 1, 0, b"rejected"),
475 Err(WalError::Sealed)
476 ));
477 }
478}