1use crate::record::WalRecord;
3use crate::SyncMode;
4use rustlite_core::{Error, Result};
5use std::fs::{File, OpenOptions};
6use std::io::{BufWriter, Write};
7use std::path::PathBuf;
8
9pub struct WalWriter {
10 file: BufWriter<File>,
11 current_segment: PathBuf,
12 current_size: u64,
13 max_segment_size: u64,
14 sync_mode: SyncMode,
15 sequence: u64,
16 wal_dir: PathBuf,
17}
18
19impl WalWriter {
20 pub fn new(wal_dir: &PathBuf, max_segment_size: u64, sync_mode: SyncMode) -> Result<Self> {
21 std::fs::create_dir_all(wal_dir)
23 .map_err(|e| Error::Storage(format!("Failed to create WAL directory: {}", e)))?;
24
25 let starting_sequence = Self::find_max_sequence(wal_dir)?;
27
28 let segment_name = format!("wal-{:016x}.log", starting_sequence);
30 let segment_path = wal_dir.join(&segment_name);
31
32 let file = OpenOptions::new()
34 .create(true)
35 .append(true)
36 .open(&segment_path)
37 .map_err(|e| Error::Storage(format!("Failed to open WAL segment: {}", e)))?;
38
39 let current_size = file
41 .metadata()
42 .map(|m| m.len())
43 .unwrap_or(0);
44
45 Ok(Self {
46 file: BufWriter::new(file),
47 current_segment: segment_path,
48 current_size,
49 max_segment_size,
50 sync_mode,
51 sequence: starting_sequence,
52 wal_dir: wal_dir.clone(),
53 })
54 }
55
56 fn find_max_sequence(wal_dir: &PathBuf) -> Result<u64> {
58 let mut max_seq = 0u64;
59
60 if let Ok(entries) = std::fs::read_dir(wal_dir) {
61 for entry in entries.flatten() {
62 if let Some(name) = entry.file_name().to_str() {
63 if name.starts_with("wal-") && name.ends_with(".log") {
64 if let Some(seq_str) = name.strip_prefix("wal-").and_then(|s| s.strip_suffix(".log")) {
66 if let Ok(seq) = u64::from_str_radix(seq_str, 16) {
67 max_seq = max_seq.max(seq);
68 }
69 }
70 }
71 }
72 }
73 }
74
75 Ok(max_seq)
76 }
77
78 pub fn append(&mut self, record: WalRecord) -> Result<u64> {
79 let encoded = record.encode()?;
81 let record_size = encoded.len() as u64;
82
83 if self.current_size + record_size > self.max_segment_size {
85 self.rotate_segment()?;
86 }
87
88 self.file
90 .write_all(&encoded)
91 .map_err(|e| Error::Storage(format!("Failed to write WAL record: {}", e)))?;
92
93 self.current_size += record_size;
94 self.sequence += 1;
95
96 if matches!(self.sync_mode, SyncMode::Sync) {
98 self.sync()?;
99 }
100
101 Ok(self.sequence)
102 }
103
104 pub fn sync(&mut self) -> Result<()> {
105 self.file
106 .flush()
107 .map_err(|e| Error::Storage(format!("Failed to flush WAL: {}", e)))?;
108
109 self.file
110 .get_ref()
111 .sync_all()
112 .map_err(|e| Error::Storage(format!("Failed to sync WAL: {}", e)))?;
113
114 Ok(())
115 }
116
117 fn rotate_segment(&mut self) -> Result<()> {
118 self.sync()?;
120
121 let segment_name = format!("wal-{:016x}.log", self.sequence + 1);
123 let new_segment = self.wal_dir.join(&segment_name);
124
125 let file = OpenOptions::new()
127 .create(true)
128 .append(true)
129 .open(&new_segment)
130 .map_err(|e| Error::Storage(format!("Failed to create new segment: {}", e)))?;
131
132 self.file = BufWriter::new(file);
134 self.current_segment = new_segment;
135 self.current_size = 0;
136
137 Ok(())
138 }
139
140 pub fn current_segment_path(&self) -> &PathBuf {
142 &self.current_segment
143 }
144
145 pub fn sequence(&self) -> u64 {
147 self.sequence
148 }
149
150 pub fn current_segment_size(&self) -> u64 {
152 self.current_size
153 }
154}
155
156impl Drop for WalWriter {
157 fn drop(&mut self) {
158 let _ = self.sync();
160 }
161}
162
163#[cfg(test)]
164mod tests {
165 use super::*;
166 use tempfile::TempDir;
167
168 fn setup_test_wal() -> (TempDir, PathBuf) {
169 let temp_dir = TempDir::new().expect("Failed to create temp dir");
170 let wal_path = temp_dir.path().join("wal");
171 std::fs::create_dir_all(&wal_path).expect("Failed to create WAL dir");
172 (temp_dir, wal_path)
173 }
174
175 #[test]
176 fn test_writer_creation() {
177 let (_temp_dir, wal_path) = setup_test_wal();
178
179 let writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
180 .expect("Failed to create writer");
181
182 assert!(writer.current_segment_path().exists());
183 assert_eq!(writer.sequence(), 0);
184 }
185
186 #[test]
187 fn test_append_single_record() {
188 let (_temp_dir, wal_path) = setup_test_wal();
189
190 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
191 .expect("Failed to create writer");
192
193 let record = WalRecord::put(b"key1".to_vec(), b"value1".to_vec());
194 let seq = writer.append(record).expect("Failed to append");
195
196 assert_eq!(seq, 1);
197 assert!(writer.current_segment_size() > 0);
198 }
199
200 #[test]
201 fn test_append_multiple_records() {
202 let (_temp_dir, wal_path) = setup_test_wal();
203
204 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
205 .expect("Failed to create writer");
206
207 for i in 0..10 {
208 let record = WalRecord::put(
209 format!("key{}", i).into_bytes(),
210 format!("value{}", i).into_bytes(),
211 );
212 let seq = writer.append(record).expect("Failed to append");
213 assert_eq!(seq, i as u64 + 1);
214 }
215 }
216
217 #[test]
218 fn test_segment_rotation() {
219 let (_temp_dir, wal_path) = setup_test_wal();
220
221 let mut writer = WalWriter::new(&wal_path, 100, SyncMode::Sync)
223 .expect("Failed to create writer");
224
225 let initial_segment = writer.current_segment_path().clone();
226
227 for i in 0..10 {
229 let record = WalRecord::put(
230 format!("key{}", i).into_bytes(),
231 format!("value{}", i).into_bytes(),
232 );
233 writer.append(record).expect("Failed to append");
234 }
235
236 assert_ne!(writer.current_segment_path(), &initial_segment);
238
239 let segments: Vec<_> = std::fs::read_dir(&wal_path)
241 .expect("Failed to read dir")
242 .filter_map(|e| e.ok())
243 .filter(|e| e.path().extension().map(|ext| ext == "log").unwrap_or(false))
244 .collect();
245
246 assert!(segments.len() > 1, "Expected multiple segments after rotation");
247 }
248
249 #[test]
250 fn test_sync_modes() {
251 for sync_mode in [SyncMode::Sync, SyncMode::Async, SyncMode::None] {
252 let (_temp_dir, wal_path) = setup_test_wal();
253
254 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, sync_mode)
255 .expect("Failed to create writer");
256
257 let record = WalRecord::put(b"key".to_vec(), b"value".to_vec());
258 writer.append(record).expect("Failed to append");
259
260 writer.sync().expect("Failed to sync");
262 }
263 }
264
265 #[test]
266 fn test_writer_resume_sequence() {
267 let (_temp_dir, wal_path) = setup_test_wal();
268
269 {
271 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
272 .expect("Failed to create writer");
273
274 for i in 0..5 {
275 writer
276 .append(WalRecord::put(
277 format!("key{}", i).into_bytes(),
278 format!("value{}", i).into_bytes(),
279 ))
280 .expect("Failed to append");
281 }
282 }
283
284 let writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
286 .expect("Failed to create writer");
287
288 assert!(writer.current_segment_path().exists());
290 }
291
292 #[test]
293 fn test_different_record_types() {
294 let (_temp_dir, wal_path) = setup_test_wal();
295
296 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
297 .expect("Failed to create writer");
298
299 writer
301 .append(WalRecord::put(b"key1".to_vec(), b"value1".to_vec()))
302 .expect("Failed to append PUT");
303
304 writer
306 .append(WalRecord::delete(b"key2".to_vec()))
307 .expect("Failed to append DELETE");
308
309 writer
311 .append(WalRecord::begin_tx(1))
312 .expect("Failed to append BEGIN_TX");
313 writer
314 .append(WalRecord::commit_tx(1))
315 .expect("Failed to append COMMIT_TX");
316
317 writer
319 .append(WalRecord::checkpoint(100))
320 .expect("Failed to append CHECKPOINT");
321
322 assert_eq!(writer.sequence(), 5);
323 }
324
325 #[test]
326 fn test_large_record() {
327 let (_temp_dir, wal_path) = setup_test_wal();
328
329 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
330 .expect("Failed to create writer");
331
332 let large_value = vec![0u8; 1024 * 1024];
334 let record = WalRecord::put(b"large_key".to_vec(), large_value);
335
336 writer.append(record).expect("Failed to append large record");
337
338 assert!(writer.current_segment_size() > 1024 * 1024);
339 }
340}