rustlite_wal/
writer.rs

1// WAL writer module - handles appending records to the log
2use crate::record::WalRecord;
3use crate::SyncMode;
4use rustlite_core::{Error, Result};
5use std::fs::{File, OpenOptions};
6use std::io::{BufWriter, Read, Write};
7use std::path::PathBuf;
8use tracing::{debug, info, instrument};
9
10/// Magic bytes for WAL segment files ("RLWL" = RustLite WAL)
11const WAL_MAGIC_HEADER: [u8; 4] = *b"RLWL";
12
13/// WAL format version (v1.0.0+)
14const WAL_FORMAT_VERSION: u16 = 1;
15
16/// File header written at the start of WAL segment files (v1.0+)
17#[derive(Debug, Clone)]
18pub struct WalHeader {
19    /// Magic bytes: "RLWL"
20    pub magic: [u8; 4],
21    /// Format version
22    pub version: u16,
23}
24
25impl WalHeader {
26    /// Size of header in bytes
27    pub const SIZE: usize = 6; // 4 bytes magic + 2 bytes version
28
29    /// Create a new header with current version
30    pub fn new() -> Self {
31        Self {
32            magic: WAL_MAGIC_HEADER,
33            version: WAL_FORMAT_VERSION,
34        }
35    }
36
37    /// Write header to a writer
38    pub fn write_to<W: Write>(&self, writer: &mut W) -> Result<()> {
39        writer.write_all(&self.magic)?;
40        writer.write_all(&self.version.to_le_bytes())?;
41        Ok(())
42    }
43
44    /// Read header from a reader
45    pub fn read_from<R: Read>(reader: &mut R) -> Result<Self> {
46        let mut magic = [0u8; 4];
47        reader.read_exact(&mut magic)?;
48
49        if magic != WAL_MAGIC_HEADER {
50            return Err(Error::Corruption(format!(
51                "Invalid WAL magic: expected {:?}, got {:?}",
52                WAL_MAGIC_HEADER, magic
53            )));
54        }
55
56        let mut version_bytes = [0u8; 2];
57        reader.read_exact(&mut version_bytes)?;
58        let version = u16::from_le_bytes(version_bytes);
59
60        if version > WAL_FORMAT_VERSION {
61            return Err(Error::Corruption(format!(
62                "Unsupported WAL version: {} (current: {})",
63                version, WAL_FORMAT_VERSION
64            )));
65        }
66
67        Ok(Self { magic, version })
68    }
69}
70pub struct WalWriter {
71    file: BufWriter<File>,
72    current_segment: PathBuf,
73    current_size: u64,
74    max_segment_size: u64,
75    sync_mode: SyncMode,
76    sequence: u64,
77    wal_dir: PathBuf,
78}
79
80impl WalWriter {
81    #[instrument(skip(wal_dir), fields(wal_dir = ?wal_dir, max_segment_size = max_segment_size))]
82    pub fn new(wal_dir: &PathBuf, max_segment_size: u64, sync_mode: SyncMode) -> Result<Self> {
83        info!("Creating WAL writer");
84
85        // Create WAL directory if it doesn't exist
86        std::fs::create_dir_all(wal_dir)
87            .map_err(|e| Error::Storage(format!("Failed to create WAL directory: {}", e)))?;
88
89        // Find existing segments to determine starting sequence
90        let starting_sequence = Self::find_max_sequence(wal_dir)?;
91
92        // Generate segment filename with timestamp
93        let segment_name = format!("wal-{:016x}.log", starting_sequence);
94        let segment_path = wal_dir.join(&segment_name);
95
96        // Open file for appending
97        let mut file = OpenOptions::new()
98            .create(true)
99            .read(true)
100            .append(true)
101            .open(&segment_path)
102            .map_err(|e| Error::Storage(format!("Failed to open WAL segment: {}", e)))?;
103
104        // Get current file size for rotation tracking
105        let current_size = file.metadata().map(|m| m.len()).unwrap_or(0);
106
107        // Write header if this is a new file (v1.0+)
108        if current_size == 0 {
109            let header = WalHeader::new();
110            header.write_to(&mut file)?;
111            file.flush()?;
112            debug!("Wrote WAL header to new segment");
113        }
114
115        // Get actual size after potentially writing header
116        let actual_size = file.metadata().map(|m| m.len()).unwrap_or(0);
117
118        Ok(Self {
119            file: BufWriter::new(file),
120            current_segment: segment_path,
121            current_size: actual_size,
122            max_segment_size,
123            sync_mode,
124            sequence: starting_sequence,
125            wal_dir: wal_dir.clone(),
126        })
127    }
128
129    /// Find the maximum sequence number from existing segments
130    fn find_max_sequence(wal_dir: &PathBuf) -> Result<u64> {
131        let mut max_seq = 0u64;
132
133        if let Ok(entries) = std::fs::read_dir(wal_dir) {
134            for entry in entries.flatten() {
135                if let Some(name) = entry.file_name().to_str() {
136                    if name.starts_with("wal-") && name.ends_with(".log") {
137                        // Parse sequence from filename: wal-{seq}.log
138                        if let Some(seq_str) = name
139                            .strip_prefix("wal-")
140                            .and_then(|s| s.strip_suffix(".log"))
141                        {
142                            if let Ok(seq) = u64::from_str_radix(seq_str, 16) {
143                                max_seq = max_seq.max(seq);
144                            }
145                        }
146                    }
147                }
148            }
149        }
150
151        Ok(max_seq)
152    }
153
154    #[instrument(skip(self, record), fields(record_type = ?record))]
155    pub fn append(&mut self, record: WalRecord) -> Result<u64> {
156        debug!(sequence = self.sequence, "Appending WAL record");
157
158        // Encode the record
159        let encoded = record.encode()?;
160        let record_size = encoded.len() as u64;
161
162        // Check if we need to rotate to a new segment
163        if self.current_size + record_size > self.max_segment_size {
164            self.rotate_segment()?;
165        }
166
167        // Write the encoded record
168        self.file
169            .write_all(&encoded)
170            .map_err(|e| Error::Storage(format!("Failed to write WAL record: {}", e)))?;
171
172        self.current_size += record_size;
173        self.sequence += 1;
174
175        // Sync if required
176        if matches!(self.sync_mode, SyncMode::Sync) {
177            self.sync()?;
178        }
179
180        Ok(self.sequence)
181    }
182
183    pub fn sync(&mut self) -> Result<()> {
184        self.file
185            .flush()
186            .map_err(|e| Error::Storage(format!("Failed to flush WAL: {}", e)))?;
187
188        self.file
189            .get_ref()
190            .sync_all()
191            .map_err(|e| Error::Storage(format!("Failed to sync WAL: {}", e)))?;
192
193        Ok(())
194    }
195
196    fn rotate_segment(&mut self) -> Result<()> {
197        // Sync current segment before rotating
198        self.sync()?;
199
200        // Increment sequence for new segment
201        self.sequence += 1;
202
203        // Generate new segment filename
204        let segment_name = format!("wal-{:016x}.log", self.sequence);
205        let new_segment = self.wal_dir.join(&segment_name);
206
207        // Open new segment
208        let mut file = OpenOptions::new()
209            .create(true)
210            .append(true)
211            .open(&new_segment)
212            .map_err(|e| Error::Storage(format!("Failed to create new segment: {}", e)))?;
213
214        // Write header for new segment (v1.0+)
215        let header = WalHeader::new();
216        header.write_to(&mut file)?;
217        file.flush()?;
218        let header_size = WalHeader::SIZE as u64;
219
220        debug!(segment = ?new_segment, "Rotated to new WAL segment");
221
222        // Update state
223        self.file = BufWriter::new(file);
224        self.current_segment = new_segment;
225        self.current_size = header_size;
226
227        Ok(())
228    }
229
230    /// Get the current segment path
231    pub fn current_segment_path(&self) -> &PathBuf {
232        &self.current_segment
233    }
234
235    /// Get the current sequence number
236    pub fn sequence(&self) -> u64 {
237        self.sequence
238    }
239
240    /// Get the current segment size in bytes
241    pub fn current_segment_size(&self) -> u64 {
242        self.current_size
243    }
244}
245
246impl Drop for WalWriter {
247    fn drop(&mut self) {
248        // Best effort sync on drop
249        let _ = self.sync();
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256    use tempfile::TempDir;
257
258    fn setup_test_wal() -> (TempDir, PathBuf) {
259        let temp_dir = TempDir::new().expect("Failed to create temp dir");
260        let wal_path = temp_dir.path().join("wal");
261        std::fs::create_dir_all(&wal_path).expect("Failed to create WAL dir");
262        (temp_dir, wal_path)
263    }
264
265    #[test]
266    fn test_writer_creation() {
267        let (_temp_dir, wal_path) = setup_test_wal();
268
269        let writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
270            .expect("Failed to create writer");
271
272        assert!(writer.current_segment_path().exists());
273        assert_eq!(writer.sequence(), 0);
274    }
275
276    #[test]
277    fn test_append_single_record() {
278        let (_temp_dir, wal_path) = setup_test_wal();
279
280        let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
281            .expect("Failed to create writer");
282
283        let record = WalRecord::put(b"key1".to_vec(), b"value1".to_vec());
284        let seq = writer.append(record).expect("Failed to append");
285
286        assert_eq!(seq, 1);
287        assert!(writer.current_segment_size() > 0);
288    }
289
290    #[test]
291    fn test_append_multiple_records() {
292        let (_temp_dir, wal_path) = setup_test_wal();
293
294        let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
295            .expect("Failed to create writer");
296
297        for i in 0..10 {
298            let record = WalRecord::put(
299                format!("key{}", i).into_bytes(),
300                format!("value{}", i).into_bytes(),
301            );
302            let seq = writer.append(record).expect("Failed to append");
303            assert_eq!(seq, i as u64 + 1);
304        }
305    }
306
307    #[test]
308    fn test_segment_rotation() {
309        let (_temp_dir, wal_path) = setup_test_wal();
310
311        // Use small segment size to force rotation
312        let mut writer =
313            WalWriter::new(&wal_path, 100, SyncMode::Sync).expect("Failed to create writer");
314
315        let initial_segment = writer.current_segment_path().clone();
316
317        // Write enough records to trigger rotation
318        for i in 0..10 {
319            let record = WalRecord::put(
320                format!("key{}", i).into_bytes(),
321                format!("value{}", i).into_bytes(),
322            );
323            writer.append(record).expect("Failed to append");
324        }
325
326        // Segment should have changed
327        assert_ne!(writer.current_segment_path(), &initial_segment);
328
329        // Should have multiple segment files
330        let segments: Vec<_> = std::fs::read_dir(&wal_path)
331            .expect("Failed to read dir")
332            .filter_map(|e| e.ok())
333            .filter(|e| {
334                e.path()
335                    .extension()
336                    .map(|ext| ext == "log")
337                    .unwrap_or(false)
338            })
339            .collect();
340
341        assert!(
342            segments.len() > 1,
343            "Expected multiple segments after rotation"
344        );
345    }
346
347    #[test]
348    fn test_sync_modes() {
349        for sync_mode in [SyncMode::Sync, SyncMode::Async, SyncMode::None] {
350            let (_temp_dir, wal_path) = setup_test_wal();
351
352            let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, sync_mode)
353                .expect("Failed to create writer");
354
355            let record = WalRecord::put(b"key".to_vec(), b"value".to_vec());
356            writer.append(record).expect("Failed to append");
357
358            // Explicit sync should work in all modes
359            writer.sync().expect("Failed to sync");
360        }
361    }
362
363    #[test]
364    fn test_writer_resume_sequence() {
365        let (_temp_dir, wal_path) = setup_test_wal();
366
367        // Write some records
368        {
369            let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
370                .expect("Failed to create writer");
371
372            for i in 0..5 {
373                writer
374                    .append(WalRecord::put(
375                        format!("key{}", i).into_bytes(),
376                        format!("value{}", i).into_bytes(),
377                    ))
378                    .expect("Failed to append");
379            }
380        }
381
382        // Create new writer - should resume from existing sequence
383        let writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
384            .expect("Failed to create writer");
385
386        // Should pick up from the existing segment
387        assert!(writer.current_segment_path().exists());
388    }
389
390    #[test]
391    fn test_different_record_types() {
392        let (_temp_dir, wal_path) = setup_test_wal();
393
394        let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
395            .expect("Failed to create writer");
396
397        // PUT record
398        writer
399            .append(WalRecord::put(b"key1".to_vec(), b"value1".to_vec()))
400            .expect("Failed to append PUT");
401
402        // DELETE record
403        writer
404            .append(WalRecord::delete(b"key2".to_vec()))
405            .expect("Failed to append DELETE");
406
407        // Transaction records
408        writer
409            .append(WalRecord::begin_tx(1))
410            .expect("Failed to append BEGIN_TX");
411        writer
412            .append(WalRecord::commit_tx(1))
413            .expect("Failed to append COMMIT_TX");
414
415        // Checkpoint record
416        writer
417            .append(WalRecord::checkpoint(100))
418            .expect("Failed to append CHECKPOINT");
419
420        assert_eq!(writer.sequence(), 5);
421    }
422
423    #[test]
424    fn test_large_record() {
425        let (_temp_dir, wal_path) = setup_test_wal();
426
427        let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
428            .expect("Failed to create writer");
429
430        // Create a large value (1MB)
431        let large_value = vec![0u8; 1024 * 1024];
432        let record = WalRecord::put(b"large_key".to_vec(), large_value);
433
434        writer
435            .append(record)
436            .expect("Failed to append large record");
437
438        assert!(writer.current_segment_size() > 1024 * 1024);
439    }
440}