rustlite_wal/
writer.rs

1// WAL writer module - handles appending records to the log
2use crate::record::WalRecord;
3use crate::SyncMode;
4use rustlite_core::{Error, Result};
5use std::fs::{File, OpenOptions};
6use std::io::{BufWriter, Write};
7use std::path::PathBuf;
8
9pub struct WalWriter {
10    file: BufWriter<File>,
11    current_segment: PathBuf,
12    current_size: u64,
13    max_segment_size: u64,
14    sync_mode: SyncMode,
15    sequence: u64,
16    wal_dir: PathBuf,
17}
18
19impl WalWriter {
20    pub fn new(wal_dir: &PathBuf, max_segment_size: u64, sync_mode: SyncMode) -> Result<Self> {
21        // Create WAL directory if it doesn't exist
22        std::fs::create_dir_all(wal_dir)
23            .map_err(|e| Error::Storage(format!("Failed to create WAL directory: {}", e)))?;
24
25        // Find existing segments to determine starting sequence
26        let starting_sequence = Self::find_max_sequence(wal_dir)?;
27
28        // Generate segment filename with timestamp
29        let segment_name = format!("wal-{:016x}.log", starting_sequence);
30        let segment_path = wal_dir.join(&segment_name);
31
32        // Open file for appending
33        let file = OpenOptions::new()
34            .create(true)
35            .append(true)
36            .open(&segment_path)
37            .map_err(|e| Error::Storage(format!("Failed to open WAL segment: {}", e)))?;
38
39        // Get current file size for rotation tracking
40        let current_size = file
41            .metadata()
42            .map(|m| m.len())
43            .unwrap_or(0);
44
45        Ok(Self {
46            file: BufWriter::new(file),
47            current_segment: segment_path,
48            current_size,
49            max_segment_size,
50            sync_mode,
51            sequence: starting_sequence,
52            wal_dir: wal_dir.clone(),
53        })
54    }
55
56    /// Find the maximum sequence number from existing segments
57    fn find_max_sequence(wal_dir: &PathBuf) -> Result<u64> {
58        let mut max_seq = 0u64;
59
60        if let Ok(entries) = std::fs::read_dir(wal_dir) {
61            for entry in entries.flatten() {
62                if let Some(name) = entry.file_name().to_str() {
63                    if name.starts_with("wal-") && name.ends_with(".log") {
64                        // Parse sequence from filename: wal-{seq}.log
65                        if let Some(seq_str) = name.strip_prefix("wal-").and_then(|s| s.strip_suffix(".log")) {
66                            if let Ok(seq) = u64::from_str_radix(seq_str, 16) {
67                                max_seq = max_seq.max(seq);
68                            }
69                        }
70                    }
71                }
72            }
73        }
74
75        Ok(max_seq)
76    }
77
78    pub fn append(&mut self, record: WalRecord) -> Result<u64> {
79        // Encode the record
80        let encoded = record.encode()?;
81        let record_size = encoded.len() as u64;
82
83        // Check if we need to rotate to a new segment
84        if self.current_size + record_size > self.max_segment_size {
85            self.rotate_segment()?;
86        }
87
88        // Write the encoded record
89        self.file
90            .write_all(&encoded)
91            .map_err(|e| Error::Storage(format!("Failed to write WAL record: {}", e)))?;
92
93        self.current_size += record_size;
94        self.sequence += 1;
95
96        // Sync if required
97        if matches!(self.sync_mode, SyncMode::Sync) {
98            self.sync()?;
99        }
100
101        Ok(self.sequence)
102    }
103
104    pub fn sync(&mut self) -> Result<()> {
105        self.file
106            .flush()
107            .map_err(|e| Error::Storage(format!("Failed to flush WAL: {}", e)))?;
108
109        self.file
110            .get_ref()
111            .sync_all()
112            .map_err(|e| Error::Storage(format!("Failed to sync WAL: {}", e)))?;
113
114        Ok(())
115    }
116
117    fn rotate_segment(&mut self) -> Result<()> {
118        // Sync current segment before rotating
119        self.sync()?;
120
121        // Generate new segment filename
122        let segment_name = format!("wal-{:016x}.log", self.sequence + 1);
123        let new_segment = self.wal_dir.join(&segment_name);
124
125        // Open new segment
126        let file = OpenOptions::new()
127            .create(true)
128            .append(true)
129            .open(&new_segment)
130            .map_err(|e| Error::Storage(format!("Failed to create new segment: {}", e)))?;
131
132        // Update state
133        self.file = BufWriter::new(file);
134        self.current_segment = new_segment;
135        self.current_size = 0;
136
137        Ok(())
138    }
139
140    /// Get the current segment path
141    pub fn current_segment_path(&self) -> &PathBuf {
142        &self.current_segment
143    }
144
145    /// Get the current sequence number
146    pub fn sequence(&self) -> u64 {
147        self.sequence
148    }
149
150    /// Get the current segment size in bytes
151    pub fn current_segment_size(&self) -> u64 {
152        self.current_size
153    }
154}
155
156impl Drop for WalWriter {
157    fn drop(&mut self) {
158        // Best effort sync on drop
159        let _ = self.sync();
160    }
161}
162
163#[cfg(test)]
164mod tests {
165    use super::*;
166    use tempfile::TempDir;
167
168    fn setup_test_wal() -> (TempDir, PathBuf) {
169        let temp_dir = TempDir::new().expect("Failed to create temp dir");
170        let wal_path = temp_dir.path().join("wal");
171        std::fs::create_dir_all(&wal_path).expect("Failed to create WAL dir");
172        (temp_dir, wal_path)
173    }
174
175    #[test]
176    fn test_writer_creation() {
177        let (_temp_dir, wal_path) = setup_test_wal();
178
179        let writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
180            .expect("Failed to create writer");
181
182        assert!(writer.current_segment_path().exists());
183        assert_eq!(writer.sequence(), 0);
184    }
185
186    #[test]
187    fn test_append_single_record() {
188        let (_temp_dir, wal_path) = setup_test_wal();
189
190        let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
191            .expect("Failed to create writer");
192
193        let record = WalRecord::put(b"key1".to_vec(), b"value1".to_vec());
194        let seq = writer.append(record).expect("Failed to append");
195
196        assert_eq!(seq, 1);
197        assert!(writer.current_segment_size() > 0);
198    }
199
200    #[test]
201    fn test_append_multiple_records() {
202        let (_temp_dir, wal_path) = setup_test_wal();
203
204        let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
205            .expect("Failed to create writer");
206
207        for i in 0..10 {
208            let record = WalRecord::put(
209                format!("key{}", i).into_bytes(),
210                format!("value{}", i).into_bytes(),
211            );
212            let seq = writer.append(record).expect("Failed to append");
213            assert_eq!(seq, i as u64 + 1);
214        }
215    }
216
217    #[test]
218    fn test_segment_rotation() {
219        let (_temp_dir, wal_path) = setup_test_wal();
220
221        // Use small segment size to force rotation
222        let mut writer = WalWriter::new(&wal_path, 100, SyncMode::Sync)
223            .expect("Failed to create writer");
224
225        let initial_segment = writer.current_segment_path().clone();
226
227        // Write enough records to trigger rotation
228        for i in 0..10 {
229            let record = WalRecord::put(
230                format!("key{}", i).into_bytes(),
231                format!("value{}", i).into_bytes(),
232            );
233            writer.append(record).expect("Failed to append");
234        }
235
236        // Segment should have changed
237        assert_ne!(writer.current_segment_path(), &initial_segment);
238
239        // Should have multiple segment files
240        let segments: Vec<_> = std::fs::read_dir(&wal_path)
241            .expect("Failed to read dir")
242            .filter_map(|e| e.ok())
243            .filter(|e| e.path().extension().map(|ext| ext == "log").unwrap_or(false))
244            .collect();
245
246        assert!(segments.len() > 1, "Expected multiple segments after rotation");
247    }
248
249    #[test]
250    fn test_sync_modes() {
251        for sync_mode in [SyncMode::Sync, SyncMode::Async, SyncMode::None] {
252            let (_temp_dir, wal_path) = setup_test_wal();
253
254            let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, sync_mode)
255                .expect("Failed to create writer");
256
257            let record = WalRecord::put(b"key".to_vec(), b"value".to_vec());
258            writer.append(record).expect("Failed to append");
259
260            // Explicit sync should work in all modes
261            writer.sync().expect("Failed to sync");
262        }
263    }
264
265    #[test]
266    fn test_writer_resume_sequence() {
267        let (_temp_dir, wal_path) = setup_test_wal();
268
269        // Write some records
270        {
271            let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
272                .expect("Failed to create writer");
273
274            for i in 0..5 {
275                writer
276                    .append(WalRecord::put(
277                        format!("key{}", i).into_bytes(),
278                        format!("value{}", i).into_bytes(),
279                    ))
280                    .expect("Failed to append");
281            }
282        }
283
284        // Create new writer - should resume from existing sequence
285        let writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
286            .expect("Failed to create writer");
287
288        // Should pick up from the existing segment
289        assert!(writer.current_segment_path().exists());
290    }
291
292    #[test]
293    fn test_different_record_types() {
294        let (_temp_dir, wal_path) = setup_test_wal();
295
296        let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
297            .expect("Failed to create writer");
298
299        // PUT record
300        writer
301            .append(WalRecord::put(b"key1".to_vec(), b"value1".to_vec()))
302            .expect("Failed to append PUT");
303
304        // DELETE record
305        writer
306            .append(WalRecord::delete(b"key2".to_vec()))
307            .expect("Failed to append DELETE");
308
309        // Transaction records
310        writer
311            .append(WalRecord::begin_tx(1))
312            .expect("Failed to append BEGIN_TX");
313        writer
314            .append(WalRecord::commit_tx(1))
315            .expect("Failed to append COMMIT_TX");
316
317        // Checkpoint record
318        writer
319            .append(WalRecord::checkpoint(100))
320            .expect("Failed to append CHECKPOINT");
321
322        assert_eq!(writer.sequence(), 5);
323    }
324
325    #[test]
326    fn test_large_record() {
327        let (_temp_dir, wal_path) = setup_test_wal();
328
329        let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
330            .expect("Failed to create writer");
331
332        // Create a large value (1MB)
333        let large_value = vec![0u8; 1024 * 1024];
334        let record = WalRecord::put(b"large_key".to_vec(), large_value);
335
336        writer.append(record).expect("Failed to append large record");
337
338        assert!(writer.current_segment_size() > 1024 * 1024);
339    }
340}