rustlite_wal/
writer.rs

1// WAL writer module - handles appending records to the log
2use crate::record::WalRecord;
3use crate::SyncMode;
4use rustlite_core::{Error, Result};
5use std::fs::{File, OpenOptions};
6use std::io::{BufWriter, Write};
7use std::path::PathBuf;
8use tracing::{debug, info, instrument};
9
10pub struct WalWriter {
11    file: BufWriter<File>,
12    current_segment: PathBuf,
13    current_size: u64,
14    max_segment_size: u64,
15    sync_mode: SyncMode,
16    sequence: u64,
17    wal_dir: PathBuf,
18}
19
20impl WalWriter {
21    #[instrument(skip(wal_dir), fields(wal_dir = ?wal_dir, max_segment_size = max_segment_size))]
22    pub fn new(wal_dir: &PathBuf, max_segment_size: u64, sync_mode: SyncMode) -> Result<Self> {
23        info!("Creating WAL writer");
24
25        // Create WAL directory if it doesn't exist
26        std::fs::create_dir_all(wal_dir)
27            .map_err(|e| Error::Storage(format!("Failed to create WAL directory: {}", e)))?;
28
29        // Find existing segments to determine starting sequence
30        let starting_sequence = Self::find_max_sequence(wal_dir)?;
31
32        // Generate segment filename with timestamp
33        let segment_name = format!("wal-{:016x}.log", starting_sequence);
34        let segment_path = wal_dir.join(&segment_name);
35
36        // Open file for appending
37        let file = OpenOptions::new()
38            .create(true)
39            .append(true)
40            .open(&segment_path)
41            .map_err(|e| Error::Storage(format!("Failed to open WAL segment: {}", e)))?;
42
43        // Get current file size for rotation tracking
44        let current_size = file.metadata().map(|m| m.len()).unwrap_or(0);
45
46        Ok(Self {
47            file: BufWriter::new(file),
48            current_segment: segment_path,
49            current_size,
50            max_segment_size,
51            sync_mode,
52            sequence: starting_sequence,
53            wal_dir: wal_dir.clone(),
54        })
55    }
56
57    /// Find the maximum sequence number from existing segments
58    fn find_max_sequence(wal_dir: &PathBuf) -> Result<u64> {
59        let mut max_seq = 0u64;
60
61        if let Ok(entries) = std::fs::read_dir(wal_dir) {
62            for entry in entries.flatten() {
63                if let Some(name) = entry.file_name().to_str() {
64                    if name.starts_with("wal-") && name.ends_with(".log") {
65                        // Parse sequence from filename: wal-{seq}.log
66                        if let Some(seq_str) = name
67                            .strip_prefix("wal-")
68                            .and_then(|s| s.strip_suffix(".log"))
69                        {
70                            if let Ok(seq) = u64::from_str_radix(seq_str, 16) {
71                                max_seq = max_seq.max(seq);
72                            }
73                        }
74                    }
75                }
76            }
77        }
78
79        Ok(max_seq)
80    }
81
82    #[instrument(skip(self, record), fields(record_type = ?record))]
83    pub fn append(&mut self, record: WalRecord) -> Result<u64> {
84        debug!(sequence = self.sequence, "Appending WAL record");
85
86        // Encode the record
87        let encoded = record.encode()?;
88        let record_size = encoded.len() as u64;
89
90        // Check if we need to rotate to a new segment
91        if self.current_size + record_size > self.max_segment_size {
92            self.rotate_segment()?;
93        }
94
95        // Write the encoded record
96        self.file
97            .write_all(&encoded)
98            .map_err(|e| Error::Storage(format!("Failed to write WAL record: {}", e)))?;
99
100        self.current_size += record_size;
101        self.sequence += 1;
102
103        // Sync if required
104        if matches!(self.sync_mode, SyncMode::Sync) {
105            self.sync()?;
106        }
107
108        Ok(self.sequence)
109    }
110
111    pub fn sync(&mut self) -> Result<()> {
112        self.file
113            .flush()
114            .map_err(|e| Error::Storage(format!("Failed to flush WAL: {}", e)))?;
115
116        self.file
117            .get_ref()
118            .sync_all()
119            .map_err(|e| Error::Storage(format!("Failed to sync WAL: {}", e)))?;
120
121        Ok(())
122    }
123
124    fn rotate_segment(&mut self) -> Result<()> {
125        // Sync current segment before rotating
126        self.sync()?;
127
128        // Generate new segment filename
129        let segment_name = format!("wal-{:016x}.log", self.sequence + 1);
130        let new_segment = self.wal_dir.join(&segment_name);
131
132        // Open new segment
133        let file = OpenOptions::new()
134            .create(true)
135            .append(true)
136            .open(&new_segment)
137            .map_err(|e| Error::Storage(format!("Failed to create new segment: {}", e)))?;
138
139        // Update state
140        self.file = BufWriter::new(file);
141        self.current_segment = new_segment;
142        self.current_size = 0;
143
144        Ok(())
145    }
146
147    /// Get the current segment path
148    pub fn current_segment_path(&self) -> &PathBuf {
149        &self.current_segment
150    }
151
152    /// Get the current sequence number
153    pub fn sequence(&self) -> u64 {
154        self.sequence
155    }
156
157    /// Get the current segment size in bytes
158    pub fn current_segment_size(&self) -> u64 {
159        self.current_size
160    }
161}
162
163impl Drop for WalWriter {
164    fn drop(&mut self) {
165        // Best effort sync on drop
166        let _ = self.sync();
167    }
168}
169
170#[cfg(test)]
171mod tests {
172    use super::*;
173    use tempfile::TempDir;
174
175    fn setup_test_wal() -> (TempDir, PathBuf) {
176        let temp_dir = TempDir::new().expect("Failed to create temp dir");
177        let wal_path = temp_dir.path().join("wal");
178        std::fs::create_dir_all(&wal_path).expect("Failed to create WAL dir");
179        (temp_dir, wal_path)
180    }
181
182    #[test]
183    fn test_writer_creation() {
184        let (_temp_dir, wal_path) = setup_test_wal();
185
186        let writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
187            .expect("Failed to create writer");
188
189        assert!(writer.current_segment_path().exists());
190        assert_eq!(writer.sequence(), 0);
191    }
192
193    #[test]
194    fn test_append_single_record() {
195        let (_temp_dir, wal_path) = setup_test_wal();
196
197        let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
198            .expect("Failed to create writer");
199
200        let record = WalRecord::put(b"key1".to_vec(), b"value1".to_vec());
201        let seq = writer.append(record).expect("Failed to append");
202
203        assert_eq!(seq, 1);
204        assert!(writer.current_segment_size() > 0);
205    }
206
207    #[test]
208    fn test_append_multiple_records() {
209        let (_temp_dir, wal_path) = setup_test_wal();
210
211        let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
212            .expect("Failed to create writer");
213
214        for i in 0..10 {
215            let record = WalRecord::put(
216                format!("key{}", i).into_bytes(),
217                format!("value{}", i).into_bytes(),
218            );
219            let seq = writer.append(record).expect("Failed to append");
220            assert_eq!(seq, i as u64 + 1);
221        }
222    }
223
224    #[test]
225    fn test_segment_rotation() {
226        let (_temp_dir, wal_path) = setup_test_wal();
227
228        // Use small segment size to force rotation
229        let mut writer =
230            WalWriter::new(&wal_path, 100, SyncMode::Sync).expect("Failed to create writer");
231
232        let initial_segment = writer.current_segment_path().clone();
233
234        // Write enough records to trigger rotation
235        for i in 0..10 {
236            let record = WalRecord::put(
237                format!("key{}", i).into_bytes(),
238                format!("value{}", i).into_bytes(),
239            );
240            writer.append(record).expect("Failed to append");
241        }
242
243        // Segment should have changed
244        assert_ne!(writer.current_segment_path(), &initial_segment);
245
246        // Should have multiple segment files
247        let segments: Vec<_> = std::fs::read_dir(&wal_path)
248            .expect("Failed to read dir")
249            .filter_map(|e| e.ok())
250            .filter(|e| {
251                e.path()
252                    .extension()
253                    .map(|ext| ext == "log")
254                    .unwrap_or(false)
255            })
256            .collect();
257
258        assert!(
259            segments.len() > 1,
260            "Expected multiple segments after rotation"
261        );
262    }
263
264    #[test]
265    fn test_sync_modes() {
266        for sync_mode in [SyncMode::Sync, SyncMode::Async, SyncMode::None] {
267            let (_temp_dir, wal_path) = setup_test_wal();
268
269            let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, sync_mode)
270                .expect("Failed to create writer");
271
272            let record = WalRecord::put(b"key".to_vec(), b"value".to_vec());
273            writer.append(record).expect("Failed to append");
274
275            // Explicit sync should work in all modes
276            writer.sync().expect("Failed to sync");
277        }
278    }
279
280    #[test]
281    fn test_writer_resume_sequence() {
282        let (_temp_dir, wal_path) = setup_test_wal();
283
284        // Write some records
285        {
286            let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
287                .expect("Failed to create writer");
288
289            for i in 0..5 {
290                writer
291                    .append(WalRecord::put(
292                        format!("key{}", i).into_bytes(),
293                        format!("value{}", i).into_bytes(),
294                    ))
295                    .expect("Failed to append");
296            }
297        }
298
299        // Create new writer - should resume from existing sequence
300        let writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
301            .expect("Failed to create writer");
302
303        // Should pick up from the existing segment
304        assert!(writer.current_segment_path().exists());
305    }
306
307    #[test]
308    fn test_different_record_types() {
309        let (_temp_dir, wal_path) = setup_test_wal();
310
311        let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
312            .expect("Failed to create writer");
313
314        // PUT record
315        writer
316            .append(WalRecord::put(b"key1".to_vec(), b"value1".to_vec()))
317            .expect("Failed to append PUT");
318
319        // DELETE record
320        writer
321            .append(WalRecord::delete(b"key2".to_vec()))
322            .expect("Failed to append DELETE");
323
324        // Transaction records
325        writer
326            .append(WalRecord::begin_tx(1))
327            .expect("Failed to append BEGIN_TX");
328        writer
329            .append(WalRecord::commit_tx(1))
330            .expect("Failed to append COMMIT_TX");
331
332        // Checkpoint record
333        writer
334            .append(WalRecord::checkpoint(100))
335            .expect("Failed to append CHECKPOINT");
336
337        assert_eq!(writer.sequence(), 5);
338    }
339
340    #[test]
341    fn test_large_record() {
342        let (_temp_dir, wal_path) = setup_test_wal();
343
344        let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
345            .expect("Failed to create writer");
346
347        // Create a large value (1MB)
348        let large_value = vec![0u8; 1024 * 1024];
349        let record = WalRecord::put(b"large_key".to_vec(), large_value);
350
351        writer
352            .append(record)
353            .expect("Failed to append large record");
354
355        assert!(writer.current_segment_size() > 1024 * 1024);
356    }
357}