1use crate::record::WalRecord;
3use crate::SyncMode;
4use rustlite_core::{Error, Result};
5use std::fs::{File, OpenOptions};
6use std::io::{BufWriter, Read, Write};
7use std::path::PathBuf;
8use tracing::{debug, info, instrument};
9
10const WAL_MAGIC_HEADER: [u8; 4] = *b"RLWL";
12
13const WAL_FORMAT_VERSION: u16 = 1;
15
16#[derive(Debug, Clone)]
18pub struct WalHeader {
19 pub magic: [u8; 4],
21 pub version: u16,
23}
24
25impl WalHeader {
26 pub const SIZE: usize = 6; pub fn new() -> Self {
31 Self {
32 magic: WAL_MAGIC_HEADER,
33 version: WAL_FORMAT_VERSION,
34 }
35 }
36
37 pub fn write_to<W: Write>(&self, writer: &mut W) -> Result<()> {
39 writer.write_all(&self.magic)?;
40 writer.write_all(&self.version.to_le_bytes())?;
41 Ok(())
42 }
43
44 pub fn read_from<R: Read>(reader: &mut R) -> Result<Self> {
46 let mut magic = [0u8; 4];
47 reader.read_exact(&mut magic)?;
48
49 if magic != WAL_MAGIC_HEADER {
50 return Err(Error::Corruption(format!(
51 "Invalid WAL magic: expected {:?}, got {:?}",
52 WAL_MAGIC_HEADER, magic
53 )));
54 }
55
56 let mut version_bytes = [0u8; 2];
57 reader.read_exact(&mut version_bytes)?;
58 let version = u16::from_le_bytes(version_bytes);
59
60 if version > WAL_FORMAT_VERSION {
61 return Err(Error::Corruption(format!(
62 "Unsupported WAL version: {} (current: {})",
63 version, WAL_FORMAT_VERSION
64 )));
65 }
66
67 Ok(Self { magic, version })
68 }
69}
70pub struct WalWriter {
71 file: BufWriter<File>,
72 current_segment: PathBuf,
73 current_size: u64,
74 max_segment_size: u64,
75 sync_mode: SyncMode,
76 sequence: u64,
77 wal_dir: PathBuf,
78}
79
80impl WalWriter {
81 #[instrument(skip(wal_dir), fields(wal_dir = ?wal_dir, max_segment_size = max_segment_size))]
82 pub fn new(wal_dir: &PathBuf, max_segment_size: u64, sync_mode: SyncMode) -> Result<Self> {
83 info!("Creating WAL writer");
84
85 std::fs::create_dir_all(wal_dir)
87 .map_err(|e| Error::Storage(format!("Failed to create WAL directory: {}", e)))?;
88
89 let starting_sequence = Self::find_max_sequence(wal_dir)?;
91
92 let segment_name = format!("wal-{:016x}.log", starting_sequence);
94 let segment_path = wal_dir.join(&segment_name);
95
96 let mut file = OpenOptions::new()
98 .create(true)
99 .read(true)
100 .append(true)
101 .open(&segment_path)
102 .map_err(|e| Error::Storage(format!("Failed to open WAL segment: {}", e)))?;
103
104 let current_size = file.metadata().map(|m| m.len()).unwrap_or(0);
106
107 if current_size == 0 {
109 let header = WalHeader::new();
110 header.write_to(&mut file)?;
111 file.flush()?;
112 debug!("Wrote WAL header to new segment");
113 }
114
115 let actual_size = file.metadata().map(|m| m.len()).unwrap_or(0);
117
118 Ok(Self {
119 file: BufWriter::new(file),
120 current_segment: segment_path,
121 current_size: actual_size,
122 max_segment_size,
123 sync_mode,
124 sequence: starting_sequence,
125 wal_dir: wal_dir.clone(),
126 })
127 }
128
129 fn find_max_sequence(wal_dir: &PathBuf) -> Result<u64> {
131 let mut max_seq = 0u64;
132
133 if let Ok(entries) = std::fs::read_dir(wal_dir) {
134 for entry in entries.flatten() {
135 if let Some(name) = entry.file_name().to_str() {
136 if name.starts_with("wal-") && name.ends_with(".log") {
137 if let Some(seq_str) = name
139 .strip_prefix("wal-")
140 .and_then(|s| s.strip_suffix(".log"))
141 {
142 if let Ok(seq) = u64::from_str_radix(seq_str, 16) {
143 max_seq = max_seq.max(seq);
144 }
145 }
146 }
147 }
148 }
149 }
150
151 Ok(max_seq)
152 }
153
154 #[instrument(skip(self, record), fields(record_type = ?record))]
155 pub fn append(&mut self, record: WalRecord) -> Result<u64> {
156 debug!(sequence = self.sequence, "Appending WAL record");
157
158 let encoded = record.encode()?;
160 let record_size = encoded.len() as u64;
161
162 if self.current_size + record_size > self.max_segment_size {
164 self.rotate_segment()?;
165 }
166
167 self.file
169 .write_all(&encoded)
170 .map_err(|e| Error::Storage(format!("Failed to write WAL record: {}", e)))?;
171
172 self.current_size += record_size;
173 self.sequence += 1;
174
175 if matches!(self.sync_mode, SyncMode::Sync) {
177 self.sync()?;
178 }
179
180 Ok(self.sequence)
181 }
182
183 pub fn sync(&mut self) -> Result<()> {
184 self.file
185 .flush()
186 .map_err(|e| Error::Storage(format!("Failed to flush WAL: {}", e)))?;
187
188 self.file
189 .get_ref()
190 .sync_all()
191 .map_err(|e| Error::Storage(format!("Failed to sync WAL: {}", e)))?;
192
193 Ok(())
194 }
195
196 fn rotate_segment(&mut self) -> Result<()> {
197 self.sync()?;
199
200 self.sequence += 1;
202
203 let segment_name = format!("wal-{:016x}.log", self.sequence);
205 let new_segment = self.wal_dir.join(&segment_name);
206
207 let mut file = OpenOptions::new()
209 .create(true)
210 .append(true)
211 .open(&new_segment)
212 .map_err(|e| Error::Storage(format!("Failed to create new segment: {}", e)))?;
213
214 let header = WalHeader::new();
216 header.write_to(&mut file)?;
217 file.flush()?;
218 let header_size = WalHeader::SIZE as u64;
219
220 debug!(segment = ?new_segment, "Rotated to new WAL segment");
221
222 self.file = BufWriter::new(file);
224 self.current_segment = new_segment;
225 self.current_size = header_size;
226
227 Ok(())
228 }
229
230 pub fn current_segment_path(&self) -> &PathBuf {
232 &self.current_segment
233 }
234
235 pub fn sequence(&self) -> u64 {
237 self.sequence
238 }
239
240 pub fn current_segment_size(&self) -> u64 {
242 self.current_size
243 }
244}
245
246impl Drop for WalWriter {
247 fn drop(&mut self) {
248 let _ = self.sync();
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use super::*;
256 use tempfile::TempDir;
257
258 fn setup_test_wal() -> (TempDir, PathBuf) {
259 let temp_dir = TempDir::new().expect("Failed to create temp dir");
260 let wal_path = temp_dir.path().join("wal");
261 std::fs::create_dir_all(&wal_path).expect("Failed to create WAL dir");
262 (temp_dir, wal_path)
263 }
264
265 #[test]
266 fn test_writer_creation() {
267 let (_temp_dir, wal_path) = setup_test_wal();
268
269 let writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
270 .expect("Failed to create writer");
271
272 assert!(writer.current_segment_path().exists());
273 assert_eq!(writer.sequence(), 0);
274 }
275
276 #[test]
277 fn test_append_single_record() {
278 let (_temp_dir, wal_path) = setup_test_wal();
279
280 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
281 .expect("Failed to create writer");
282
283 let record = WalRecord::put(b"key1".to_vec(), b"value1".to_vec());
284 let seq = writer.append(record).expect("Failed to append");
285
286 assert_eq!(seq, 1);
287 assert!(writer.current_segment_size() > 0);
288 }
289
290 #[test]
291 fn test_append_multiple_records() {
292 let (_temp_dir, wal_path) = setup_test_wal();
293
294 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
295 .expect("Failed to create writer");
296
297 for i in 0..10 {
298 let record = WalRecord::put(
299 format!("key{}", i).into_bytes(),
300 format!("value{}", i).into_bytes(),
301 );
302 let seq = writer.append(record).expect("Failed to append");
303 assert_eq!(seq, i as u64 + 1);
304 }
305 }
306
307 #[test]
308 fn test_segment_rotation() {
309 let (_temp_dir, wal_path) = setup_test_wal();
310
311 let mut writer =
313 WalWriter::new(&wal_path, 100, SyncMode::Sync).expect("Failed to create writer");
314
315 let initial_segment = writer.current_segment_path().clone();
316
317 for i in 0..10 {
319 let record = WalRecord::put(
320 format!("key{}", i).into_bytes(),
321 format!("value{}", i).into_bytes(),
322 );
323 writer.append(record).expect("Failed to append");
324 }
325
326 assert_ne!(writer.current_segment_path(), &initial_segment);
328
329 let segments: Vec<_> = std::fs::read_dir(&wal_path)
331 .expect("Failed to read dir")
332 .filter_map(|e| e.ok())
333 .filter(|e| {
334 e.path()
335 .extension()
336 .map(|ext| ext == "log")
337 .unwrap_or(false)
338 })
339 .collect();
340
341 assert!(
342 segments.len() > 1,
343 "Expected multiple segments after rotation"
344 );
345 }
346
347 #[test]
348 fn test_sync_modes() {
349 for sync_mode in [SyncMode::Sync, SyncMode::Async, SyncMode::None] {
350 let (_temp_dir, wal_path) = setup_test_wal();
351
352 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, sync_mode)
353 .expect("Failed to create writer");
354
355 let record = WalRecord::put(b"key".to_vec(), b"value".to_vec());
356 writer.append(record).expect("Failed to append");
357
358 writer.sync().expect("Failed to sync");
360 }
361 }
362
363 #[test]
364 fn test_writer_resume_sequence() {
365 let (_temp_dir, wal_path) = setup_test_wal();
366
367 {
369 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
370 .expect("Failed to create writer");
371
372 for i in 0..5 {
373 writer
374 .append(WalRecord::put(
375 format!("key{}", i).into_bytes(),
376 format!("value{}", i).into_bytes(),
377 ))
378 .expect("Failed to append");
379 }
380 }
381
382 let writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
384 .expect("Failed to create writer");
385
386 assert!(writer.current_segment_path().exists());
388 }
389
390 #[test]
391 fn test_different_record_types() {
392 let (_temp_dir, wal_path) = setup_test_wal();
393
394 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
395 .expect("Failed to create writer");
396
397 writer
399 .append(WalRecord::put(b"key1".to_vec(), b"value1".to_vec()))
400 .expect("Failed to append PUT");
401
402 writer
404 .append(WalRecord::delete(b"key2".to_vec()))
405 .expect("Failed to append DELETE");
406
407 writer
409 .append(WalRecord::begin_tx(1))
410 .expect("Failed to append BEGIN_TX");
411 writer
412 .append(WalRecord::commit_tx(1))
413 .expect("Failed to append COMMIT_TX");
414
415 writer
417 .append(WalRecord::checkpoint(100))
418 .expect("Failed to append CHECKPOINT");
419
420 assert_eq!(writer.sequence(), 5);
421 }
422
423 #[test]
424 fn test_large_record() {
425 let (_temp_dir, wal_path) = setup_test_wal();
426
427 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
428 .expect("Failed to create writer");
429
430 let large_value = vec![0u8; 1024 * 1024];
432 let record = WalRecord::put(b"large_key".to_vec(), large_value);
433
434 writer
435 .append(record)
436 .expect("Failed to append large record");
437
438 assert!(writer.current_segment_size() > 1024 * 1024);
439 }
440}