1use crate::record::WalRecord;
3use crate::SyncMode;
4use rustlite_core::{Error, Result};
5use std::fs::{File, OpenOptions};
6use std::io::{BufWriter, Write};
7use std::path::PathBuf;
8use tracing::{debug, info, instrument};
9
10pub struct WalWriter {
11 file: BufWriter<File>,
12 current_segment: PathBuf,
13 current_size: u64,
14 max_segment_size: u64,
15 sync_mode: SyncMode,
16 sequence: u64,
17 wal_dir: PathBuf,
18}
19
20impl WalWriter {
21 #[instrument(skip(wal_dir), fields(wal_dir = ?wal_dir, max_segment_size = max_segment_size))]
22 pub fn new(wal_dir: &PathBuf, max_segment_size: u64, sync_mode: SyncMode) -> Result<Self> {
23 info!("Creating WAL writer");
24
25 std::fs::create_dir_all(wal_dir)
27 .map_err(|e| Error::Storage(format!("Failed to create WAL directory: {}", e)))?;
28
29 let starting_sequence = Self::find_max_sequence(wal_dir)?;
31
32 let segment_name = format!("wal-{:016x}.log", starting_sequence);
34 let segment_path = wal_dir.join(&segment_name);
35
36 let file = OpenOptions::new()
38 .create(true)
39 .append(true)
40 .open(&segment_path)
41 .map_err(|e| Error::Storage(format!("Failed to open WAL segment: {}", e)))?;
42
43 let current_size = file.metadata().map(|m| m.len()).unwrap_or(0);
45
46 Ok(Self {
47 file: BufWriter::new(file),
48 current_segment: segment_path,
49 current_size,
50 max_segment_size,
51 sync_mode,
52 sequence: starting_sequence,
53 wal_dir: wal_dir.clone(),
54 })
55 }
56
57 fn find_max_sequence(wal_dir: &PathBuf) -> Result<u64> {
59 let mut max_seq = 0u64;
60
61 if let Ok(entries) = std::fs::read_dir(wal_dir) {
62 for entry in entries.flatten() {
63 if let Some(name) = entry.file_name().to_str() {
64 if name.starts_with("wal-") && name.ends_with(".log") {
65 if let Some(seq_str) = name
67 .strip_prefix("wal-")
68 .and_then(|s| s.strip_suffix(".log"))
69 {
70 if let Ok(seq) = u64::from_str_radix(seq_str, 16) {
71 max_seq = max_seq.max(seq);
72 }
73 }
74 }
75 }
76 }
77 }
78
79 Ok(max_seq)
80 }
81
82 #[instrument(skip(self, record), fields(record_type = ?record))]
83 pub fn append(&mut self, record: WalRecord) -> Result<u64> {
84 debug!(sequence = self.sequence, "Appending WAL record");
85
86 let encoded = record.encode()?;
88 let record_size = encoded.len() as u64;
89
90 if self.current_size + record_size > self.max_segment_size {
92 self.rotate_segment()?;
93 }
94
95 self.file
97 .write_all(&encoded)
98 .map_err(|e| Error::Storage(format!("Failed to write WAL record: {}", e)))?;
99
100 self.current_size += record_size;
101 self.sequence += 1;
102
103 if matches!(self.sync_mode, SyncMode::Sync) {
105 self.sync()?;
106 }
107
108 Ok(self.sequence)
109 }
110
111 pub fn sync(&mut self) -> Result<()> {
112 self.file
113 .flush()
114 .map_err(|e| Error::Storage(format!("Failed to flush WAL: {}", e)))?;
115
116 self.file
117 .get_ref()
118 .sync_all()
119 .map_err(|e| Error::Storage(format!("Failed to sync WAL: {}", e)))?;
120
121 Ok(())
122 }
123
124 fn rotate_segment(&mut self) -> Result<()> {
125 self.sync()?;
127
128 let segment_name = format!("wal-{:016x}.log", self.sequence + 1);
130 let new_segment = self.wal_dir.join(&segment_name);
131
132 let file = OpenOptions::new()
134 .create(true)
135 .append(true)
136 .open(&new_segment)
137 .map_err(|e| Error::Storage(format!("Failed to create new segment: {}", e)))?;
138
139 self.file = BufWriter::new(file);
141 self.current_segment = new_segment;
142 self.current_size = 0;
143
144 Ok(())
145 }
146
147 pub fn current_segment_path(&self) -> &PathBuf {
149 &self.current_segment
150 }
151
152 pub fn sequence(&self) -> u64 {
154 self.sequence
155 }
156
157 pub fn current_segment_size(&self) -> u64 {
159 self.current_size
160 }
161}
162
163impl Drop for WalWriter {
164 fn drop(&mut self) {
165 let _ = self.sync();
167 }
168}
169
170#[cfg(test)]
171mod tests {
172 use super::*;
173 use tempfile::TempDir;
174
175 fn setup_test_wal() -> (TempDir, PathBuf) {
176 let temp_dir = TempDir::new().expect("Failed to create temp dir");
177 let wal_path = temp_dir.path().join("wal");
178 std::fs::create_dir_all(&wal_path).expect("Failed to create WAL dir");
179 (temp_dir, wal_path)
180 }
181
182 #[test]
183 fn test_writer_creation() {
184 let (_temp_dir, wal_path) = setup_test_wal();
185
186 let writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
187 .expect("Failed to create writer");
188
189 assert!(writer.current_segment_path().exists());
190 assert_eq!(writer.sequence(), 0);
191 }
192
193 #[test]
194 fn test_append_single_record() {
195 let (_temp_dir, wal_path) = setup_test_wal();
196
197 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
198 .expect("Failed to create writer");
199
200 let record = WalRecord::put(b"key1".to_vec(), b"value1".to_vec());
201 let seq = writer.append(record).expect("Failed to append");
202
203 assert_eq!(seq, 1);
204 assert!(writer.current_segment_size() > 0);
205 }
206
207 #[test]
208 fn test_append_multiple_records() {
209 let (_temp_dir, wal_path) = setup_test_wal();
210
211 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
212 .expect("Failed to create writer");
213
214 for i in 0..10 {
215 let record = WalRecord::put(
216 format!("key{}", i).into_bytes(),
217 format!("value{}", i).into_bytes(),
218 );
219 let seq = writer.append(record).expect("Failed to append");
220 assert_eq!(seq, i as u64 + 1);
221 }
222 }
223
224 #[test]
225 fn test_segment_rotation() {
226 let (_temp_dir, wal_path) = setup_test_wal();
227
228 let mut writer =
230 WalWriter::new(&wal_path, 100, SyncMode::Sync).expect("Failed to create writer");
231
232 let initial_segment = writer.current_segment_path().clone();
233
234 for i in 0..10 {
236 let record = WalRecord::put(
237 format!("key{}", i).into_bytes(),
238 format!("value{}", i).into_bytes(),
239 );
240 writer.append(record).expect("Failed to append");
241 }
242
243 assert_ne!(writer.current_segment_path(), &initial_segment);
245
246 let segments: Vec<_> = std::fs::read_dir(&wal_path)
248 .expect("Failed to read dir")
249 .filter_map(|e| e.ok())
250 .filter(|e| {
251 e.path()
252 .extension()
253 .map(|ext| ext == "log")
254 .unwrap_or(false)
255 })
256 .collect();
257
258 assert!(
259 segments.len() > 1,
260 "Expected multiple segments after rotation"
261 );
262 }
263
264 #[test]
265 fn test_sync_modes() {
266 for sync_mode in [SyncMode::Sync, SyncMode::Async, SyncMode::None] {
267 let (_temp_dir, wal_path) = setup_test_wal();
268
269 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, sync_mode)
270 .expect("Failed to create writer");
271
272 let record = WalRecord::put(b"key".to_vec(), b"value".to_vec());
273 writer.append(record).expect("Failed to append");
274
275 writer.sync().expect("Failed to sync");
277 }
278 }
279
280 #[test]
281 fn test_writer_resume_sequence() {
282 let (_temp_dir, wal_path) = setup_test_wal();
283
284 {
286 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
287 .expect("Failed to create writer");
288
289 for i in 0..5 {
290 writer
291 .append(WalRecord::put(
292 format!("key{}", i).into_bytes(),
293 format!("value{}", i).into_bytes(),
294 ))
295 .expect("Failed to append");
296 }
297 }
298
299 let writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
301 .expect("Failed to create writer");
302
303 assert!(writer.current_segment_path().exists());
305 }
306
307 #[test]
308 fn test_different_record_types() {
309 let (_temp_dir, wal_path) = setup_test_wal();
310
311 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
312 .expect("Failed to create writer");
313
314 writer
316 .append(WalRecord::put(b"key1".to_vec(), b"value1".to_vec()))
317 .expect("Failed to append PUT");
318
319 writer
321 .append(WalRecord::delete(b"key2".to_vec()))
322 .expect("Failed to append DELETE");
323
324 writer
326 .append(WalRecord::begin_tx(1))
327 .expect("Failed to append BEGIN_TX");
328 writer
329 .append(WalRecord::commit_tx(1))
330 .expect("Failed to append COMMIT_TX");
331
332 writer
334 .append(WalRecord::checkpoint(100))
335 .expect("Failed to append CHECKPOINT");
336
337 assert_eq!(writer.sequence(), 5);
338 }
339
340 #[test]
341 fn test_large_record() {
342 let (_temp_dir, wal_path) = setup_test_wal();
343
344 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
345 .expect("Failed to create writer");
346
347 let large_value = vec![0u8; 1024 * 1024];
349 let record = WalRecord::put(b"large_key".to_vec(), large_value);
350
351 writer
352 .append(record)
353 .expect("Failed to append large record");
354
355 assert!(writer.current_segment_size() > 1024 * 1024);
356 }
357}