1use std::fs::{File, OpenOptions};
24use std::path::Path;
25use std::sync::atomic::{AtomicU64, Ordering};
26
27#[cfg(not(target_arch = "wasm32"))]
28use std::os::unix::fs::OpenOptionsExt as _;
29
30use crate::align::{AlignedBuf, DEFAULT_ALIGNMENT};
31use crate::double_write::DwbMode;
32use crate::error::{Result, WalError};
33use crate::preamble::SegmentPreamble;
34use crate::record::{HEADER_SIZE, WalRecord};
35
36pub const DEFAULT_WRITE_BUFFER_SIZE: usize = 2 * 1024 * 1024;
44
45#[derive(Debug, Clone)]
47pub struct WalWriterConfig {
48 pub write_buffer_size: usize,
50
51 pub alignment: usize,
53
54 pub use_direct_io: bool,
57
58 pub dwb_mode: Option<DwbMode>,
62}
63
64impl Default for WalWriterConfig {
65 fn default() -> Self {
66 Self {
67 write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
68 alignment: DEFAULT_ALIGNMENT,
69 use_direct_io: true,
70 dwb_mode: None,
71 }
72 }
73}
74
75fn resolve_dwb_mode(config: &WalWriterConfig) -> DwbMode {
76 config
77 .dwb_mode
78 .unwrap_or_else(|| DwbMode::default_for_parent(config.use_direct_io))
79}
80
81fn open_dwb_for(
82 config: &WalWriterConfig,
83 path: &Path,
84) -> Option<crate::double_write::DoubleWriteBuffer> {
85 let mode = resolve_dwb_mode(config);
86 if mode == DwbMode::Off {
87 return None;
88 }
89 let dwb_path = path.with_extension("dwb");
90 match crate::double_write::DoubleWriteBuffer::open(&dwb_path, mode) {
91 Ok(d) => Some(d),
92 Err(e) => {
93 tracing::warn!(
94 path = %dwb_path.display(),
95 error = %e,
96 mode = ?mode,
97 "failed to open DWB — torn-write protection disabled for this writer"
98 );
99 None
100 }
101 }
102}
103
104pub struct WalWriter {
106 file: File,
108
109 buffer: AlignedBuf,
111
112 file_offset: u64,
114
115 next_lsn: AtomicU64,
117
118 sealed: bool,
120
121 config: WalWriterConfig,
123
124 encryption_ring: Option<crate::crypto::KeyRing>,
126
127 segment_preamble: Option<SegmentPreamble>,
131
132 double_write: Option<crate::double_write::DoubleWriteBuffer>,
135}
136
137impl WalWriter {
138 pub fn open(path: &Path, config: WalWriterConfig) -> Result<Self> {
140 let mut opts = OpenOptions::new();
141 opts.create(true).write(true).append(false);
142
143 #[cfg(not(target_arch = "wasm32"))]
144 if config.use_direct_io {
145 opts.custom_flags(libc::O_DIRECT);
147 }
148
149 let file = opts.open(path)?;
150
151 let buffer = AlignedBuf::new(config.write_buffer_size, config.alignment)?;
152
153 let (file_offset, next_lsn) = if path.exists() && std::fs::metadata(path)?.len() > 0 {
155 let info = crate::recovery::recover(path)?;
156 (info.end_offset, info.next_lsn())
157 } else {
158 (0, 1)
159 };
160
161 let double_write = open_dwb_for(&config, path);
162
163 Ok(Self {
164 file,
165 buffer,
166 file_offset,
167 next_lsn: AtomicU64::new(next_lsn),
168 sealed: false,
169 config,
170 encryption_ring: None,
171 segment_preamble: None,
172 double_write,
173 })
174 }
175
176 pub fn set_encryption_key(&mut self, key: crate::crypto::WalEncryptionKey) -> Result<()> {
183 self.set_encryption_ring(crate::crypto::KeyRing::new(key))
184 }
185
186 pub fn set_encryption_ring(&mut self, ring: crate::crypto::KeyRing) -> Result<()> {
192 if self.file_offset != 0 || !self.buffer.is_empty() {
193 return Err(WalError::EncryptionError {
194 detail: "set_encryption_ring must be called before writing any records".into(),
195 });
196 }
197 let epoch = *ring.current().epoch();
198 let preamble = SegmentPreamble::new_wal(epoch);
199 let preamble_bytes = preamble.to_bytes();
200
201 self.buffer.write(&preamble_bytes);
204
205 self.encryption_ring = Some(ring);
206 self.segment_preamble = Some(preamble);
207 Ok(())
208 }
209
210 pub fn encryption_ring(&self) -> Option<&crate::crypto::KeyRing> {
212 self.encryption_ring.as_ref()
213 }
214
215 pub fn segment_preamble(&self) -> Option<&SegmentPreamble> {
217 self.segment_preamble.as_ref()
218 }
219
220 pub fn open_with_start_lsn(
226 path: &Path,
227 config: WalWriterConfig,
228 start_lsn: u64,
229 ) -> Result<Self> {
230 let mut opts = OpenOptions::new();
231 opts.create(true).write(true).append(false);
232
233 #[cfg(not(target_arch = "wasm32"))]
234 if config.use_direct_io {
235 opts.custom_flags(libc::O_DIRECT);
236 }
237
238 let file = opts.open(path)?;
239 let buffer = AlignedBuf::new(config.write_buffer_size, config.alignment)?;
240
241 let double_write = open_dwb_for(&config, path);
242
243 Ok(Self {
244 file,
245 buffer,
246 file_offset: 0,
247 next_lsn: AtomicU64::new(start_lsn),
248 sealed: false,
249 config,
250 encryption_ring: None,
251 segment_preamble: None,
252 double_write,
253 })
254 }
255
256 pub fn open_without_direct_io(path: &Path) -> Result<Self> {
258 Self::open(
259 path,
260 WalWriterConfig {
261 use_direct_io: false,
262 ..Default::default()
263 },
264 )
265 }
266
267 pub fn append(
275 &mut self,
276 record_type: u32,
277 tenant_id: u64,
278 vshard_id: u32,
279 database_id: u64,
280 payload: &[u8],
281 ) -> Result<u64> {
282 if self.sealed {
283 return Err(WalError::Sealed);
284 }
285
286 let lsn = self.next_lsn.fetch_add(1, Ordering::Relaxed);
287 let preamble_bytes = self.segment_preamble.as_ref().map(|p| p.to_bytes());
288 let record = WalRecord::new(
289 record_type,
290 lsn,
291 tenant_id,
292 vshard_id,
293 database_id,
294 payload.to_vec(),
295 self.encryption_ring.as_ref().map(|r| r.current()),
296 preamble_bytes.as_ref(),
297 )?;
298
299 if let Some(dwb) = &mut self.double_write
308 && let Err(e) = dwb.write_record_deferred(&record)
309 {
310 tracing::warn!(
311 lsn = lsn,
312 error = %e,
313 "DWB write failed — torn-write protection degraded, detaching DWB"
314 );
315 self.double_write = None;
316 }
317
318 let header_bytes = record.header.to_bytes();
319 let total_size = HEADER_SIZE + record.payload.len();
320
321 if self.buffer.remaining() < total_size {
323 self.flush_buffer()?;
324 }
325
326 if total_size > self.buffer.capacity() {
329 return Err(WalError::PayloadTooLarge {
330 size: record.payload.len(),
331 max: self.buffer.capacity() - HEADER_SIZE,
332 });
333 }
334
335 self.buffer.write(&header_bytes);
336 self.buffer.write(&record.payload);
337
338 Ok(lsn)
339 }
340
341 pub fn sync(&mut self) -> Result<()> {
347 if self.buffer.is_empty() {
348 return Ok(());
349 }
350 if let Some(dwb) = &mut self.double_write
355 && let Err(e) = dwb.flush()
356 {
357 tracing::warn!(
358 error = %e,
359 "DWB flush failed — torn-write protection lost for this batch, detaching DWB"
360 );
361 self.double_write = None;
362 }
363 self.flush_buffer()?;
364 self.file.sync_all()?;
365 Ok(())
366 }
367
368 pub fn seal(&mut self) -> Result<()> {
372 self.sync()?;
373 self.sealed = true;
374 Ok(())
375 }
376
377 pub fn next_lsn(&self) -> u64 {
379 self.next_lsn.load(Ordering::Relaxed)
380 }
381
382 pub fn file_offset(&self) -> u64 {
384 self.file_offset
385 }
386
387 fn flush_buffer(&mut self) -> Result<()> {
389 if self.buffer.is_empty() {
390 return Ok(());
391 }
392
393 let data = if self.config.use_direct_io {
394 self.buffer.as_aligned_slice()
396 } else {
397 self.buffer.as_slice()
399 };
400
401 #[cfg(unix)]
403 {
404 use std::os::unix::io::AsRawFd;
405 let fd = self.file.as_raw_fd();
406 let mut remaining = data;
407 let mut write_offset = self.file_offset;
408 while !remaining.is_empty() {
409 let written = unsafe {
410 libc::pwrite(
411 fd,
412 remaining.as_ptr() as *const libc::c_void,
413 remaining.len(),
414 write_offset as libc::off_t,
415 )
416 };
417 if written < 0 {
418 return Err(WalError::Io(std::io::Error::last_os_error()));
419 }
420 let n = written as usize;
421 remaining = &remaining[n..];
422 write_offset += n as u64;
423 }
424 }
425
426 self.file_offset += data.len() as u64;
427 self.buffer.clear();
428 Ok(())
429 }
430}
431
432#[cfg(test)]
433mod tests {
434 use super::*;
435 use crate::record::RecordType;
436
437 #[test]
438 fn write_and_sync_single_record() {
439 let dir = tempfile::tempdir().unwrap();
440 let path = dir.path().join("test.wal");
441
442 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
443 let lsn = writer
444 .append(RecordType::Put as u32, 1, 0, 0, b"hello")
445 .unwrap();
446 assert_eq!(lsn, 1);
447
448 writer.sync().unwrap();
449 assert!(writer.file_offset() > 0);
450 }
451
452 #[test]
453 fn lsn_increments() {
454 let dir = tempfile::tempdir().unwrap();
455 let path = dir.path().join("test.wal");
456
457 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
458
459 let lsn1 = writer
460 .append(RecordType::Put as u32, 1, 0, 0, b"first")
461 .unwrap();
462 let lsn2 = writer
463 .append(RecordType::Put as u32, 1, 0, 0, b"second")
464 .unwrap();
465 let lsn3 = writer
466 .append(RecordType::Put as u32, 1, 0, 0, b"third")
467 .unwrap();
468
469 assert_eq!(lsn1, 1);
470 assert_eq!(lsn2, 2);
471 assert_eq!(lsn3, 3);
472 }
473
474 #[test]
475 fn sealed_writer_rejects_writes() {
476 let dir = tempfile::tempdir().unwrap();
477 let path = dir.path().join("test.wal");
478
479 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
480 writer.seal().unwrap();
481
482 assert!(matches!(
483 writer.append(RecordType::Put as u32, 1, 0, 0, b"rejected"),
484 Err(WalError::Sealed)
485 ));
486 }
487}