1use std::fs::{File, OpenOptions};
22use std::os::unix::fs::OpenOptionsExt;
23use std::path::Path;
24use std::sync::atomic::{AtomicU64, Ordering};
25
26use crate::align::{AlignedBuf, DEFAULT_ALIGNMENT};
27use crate::error::{Result, WalError};
28use crate::record::{HEADER_SIZE, WalRecord};
29
30pub const DEFAULT_WRITE_BUFFER_SIZE: usize = 2 * 1024 * 1024;
38
39#[derive(Debug, Clone)]
41pub struct WalWriterConfig {
42 pub write_buffer_size: usize,
44
45 pub alignment: usize,
47
48 pub use_direct_io: bool,
51}
52
53impl Default for WalWriterConfig {
54 fn default() -> Self {
55 Self {
56 write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
57 alignment: DEFAULT_ALIGNMENT,
58 use_direct_io: true,
59 }
60 }
61}
62
63pub struct WalWriter {
65 file: File,
67
68 buffer: AlignedBuf,
70
71 file_offset: u64,
73
74 next_lsn: AtomicU64,
76
77 sealed: bool,
79
80 config: WalWriterConfig,
82
83 encryption_ring: Option<crate::crypto::KeyRing>,
85
86 double_write: Option<crate::double_write::DoubleWriteBuffer>,
89}
90
91impl WalWriter {
92 pub fn open(path: &Path, config: WalWriterConfig) -> Result<Self> {
94 let mut opts = OpenOptions::new();
95 opts.create(true).write(true).append(false);
96
97 if config.use_direct_io {
98 opts.custom_flags(libc::O_DIRECT);
100 }
101
102 let file = opts.open(path)?;
103
104 let buffer = AlignedBuf::new(config.write_buffer_size, config.alignment)?;
105
106 let (file_offset, next_lsn) = if path.exists() && std::fs::metadata(path)?.len() > 0 {
108 let info = crate::recovery::recover(path)?;
109 (info.end_offset, info.next_lsn())
110 } else {
111 (0, 1)
112 };
113
114 let dwb_path = path.with_extension("dwb");
116 let double_write = crate::double_write::DoubleWriteBuffer::open(&dwb_path).ok();
117
118 Ok(Self {
119 file,
120 buffer,
121 file_offset,
122 next_lsn: AtomicU64::new(next_lsn),
123 sealed: false,
124 config,
125 encryption_ring: None,
126 double_write,
127 })
128 }
129
130 pub fn set_encryption_key(&mut self, key: crate::crypto::WalEncryptionKey) {
133 self.encryption_ring = Some(crate::crypto::KeyRing::new(key));
134 }
135
136 pub fn set_encryption_ring(&mut self, ring: crate::crypto::KeyRing) {
138 self.encryption_ring = Some(ring);
139 }
140
141 pub fn encryption_ring(&self) -> Option<&crate::crypto::KeyRing> {
143 self.encryption_ring.as_ref()
144 }
145
146 pub fn open_with_start_lsn(
152 path: &Path,
153 config: WalWriterConfig,
154 start_lsn: u64,
155 ) -> Result<Self> {
156 let mut opts = OpenOptions::new();
157 opts.create(true).write(true).append(false);
158
159 if config.use_direct_io {
160 opts.custom_flags(libc::O_DIRECT);
161 }
162
163 let file = opts.open(path)?;
164 let buffer = AlignedBuf::new(config.write_buffer_size, config.alignment)?;
165
166 let dwb_path = path.with_extension("dwb");
167 let double_write = crate::double_write::DoubleWriteBuffer::open(&dwb_path).ok();
168
169 Ok(Self {
170 file,
171 buffer,
172 file_offset: 0,
173 next_lsn: AtomicU64::new(start_lsn),
174 sealed: false,
175 config,
176 encryption_ring: None,
177 double_write,
178 })
179 }
180
181 pub fn open_without_direct_io(path: &Path) -> Result<Self> {
183 Self::open(
184 path,
185 WalWriterConfig {
186 use_direct_io: false,
187 ..Default::default()
188 },
189 )
190 }
191
192 pub fn append(
197 &mut self,
198 record_type: u16,
199 tenant_id: u32,
200 vshard_id: u16,
201 payload: &[u8],
202 ) -> Result<u64> {
203 if self.sealed {
204 return Err(WalError::Sealed);
205 }
206
207 let lsn = self.next_lsn.fetch_add(1, Ordering::Relaxed);
208 let record = WalRecord::new(
209 record_type,
210 lsn,
211 tenant_id,
212 vshard_id,
213 payload.to_vec(),
214 self.encryption_ring.as_ref().map(|r| r.current()),
215 )?;
216
217 if let Some(dwb) = &mut self.double_write
226 && let Err(e) = dwb.write_record_deferred(&record)
227 {
228 tracing::warn!(
229 lsn = lsn,
230 error = %e,
231 "DWB write failed — torn-write protection degraded, detaching DWB"
232 );
233 self.double_write = None;
234 }
235
236 let header_bytes = record.header.to_bytes();
237 let total_size = HEADER_SIZE + record.payload.len();
238
239 if self.buffer.remaining() < total_size {
241 self.flush_buffer()?;
242 }
243
244 if total_size > self.buffer.capacity() {
247 return Err(WalError::PayloadTooLarge {
248 size: record.payload.len(),
249 max: self.buffer.capacity() - HEADER_SIZE,
250 });
251 }
252
253 self.buffer.write(&header_bytes);
254 self.buffer.write(&record.payload);
255
256 Ok(lsn)
257 }
258
259 pub fn sync(&mut self) -> Result<()> {
265 if self.buffer.is_empty() {
266 return Ok(());
267 }
268 if let Some(dwb) = &mut self.double_write
273 && let Err(e) = dwb.flush()
274 {
275 tracing::warn!(
276 error = %e,
277 "DWB flush failed — torn-write protection lost for this batch, detaching DWB"
278 );
279 self.double_write = None;
280 }
281 self.flush_buffer()?;
282 self.file.sync_all()?;
283 Ok(())
284 }
285
286 pub fn seal(&mut self) -> Result<()> {
290 self.sync()?;
291 self.sealed = true;
292 Ok(())
293 }
294
295 pub fn next_lsn(&self) -> u64 {
297 self.next_lsn.load(Ordering::Relaxed)
298 }
299
300 pub fn file_offset(&self) -> u64 {
302 self.file_offset
303 }
304
305 fn flush_buffer(&mut self) -> Result<()> {
307 if self.buffer.is_empty() {
308 return Ok(());
309 }
310
311 let data = if self.config.use_direct_io {
312 self.buffer.as_aligned_slice()
314 } else {
315 self.buffer.as_slice()
317 };
318
319 #[cfg(unix)]
321 {
322 use std::os::unix::io::AsRawFd;
323 let fd = self.file.as_raw_fd();
324 let written = unsafe {
325 libc::pwrite(
326 fd,
327 data.as_ptr() as *const libc::c_void,
328 data.len(),
329 self.file_offset as libc::off_t,
330 )
331 };
332 if written < 0 {
333 return Err(WalError::Io(std::io::Error::last_os_error()));
334 }
335 }
336
337 self.file_offset += self.buffer.len() as u64;
338 self.buffer.clear();
339 Ok(())
340 }
341}
342
343#[cfg(test)]
344mod tests {
345 use super::*;
346 use crate::record::RecordType;
347
348 #[test]
349 fn write_and_sync_single_record() {
350 let dir = tempfile::tempdir().unwrap();
351 let path = dir.path().join("test.wal");
352
353 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
354 let lsn = writer
355 .append(RecordType::Put as u16, 1, 0, b"hello")
356 .unwrap();
357 assert_eq!(lsn, 1);
358
359 writer.sync().unwrap();
360 assert!(writer.file_offset() > 0);
361 }
362
363 #[test]
364 fn lsn_increments() {
365 let dir = tempfile::tempdir().unwrap();
366 let path = dir.path().join("test.wal");
367
368 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
369
370 let lsn1 = writer
371 .append(RecordType::Put as u16, 1, 0, b"first")
372 .unwrap();
373 let lsn2 = writer
374 .append(RecordType::Put as u16, 1, 0, b"second")
375 .unwrap();
376 let lsn3 = writer
377 .append(RecordType::Put as u16, 1, 0, b"third")
378 .unwrap();
379
380 assert_eq!(lsn1, 1);
381 assert_eq!(lsn2, 2);
382 assert_eq!(lsn3, 3);
383 }
384
385 #[test]
386 fn sealed_writer_rejects_writes() {
387 let dir = tempfile::tempdir().unwrap();
388 let path = dir.path().join("test.wal");
389
390 let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
391 writer.seal().unwrap();
392
393 assert!(matches!(
394 writer.append(RecordType::Put as u16, 1, 0, b"rejected"),
395 Err(WalError::Sealed)
396 ));
397 }
398}