1use crate::record::WalRecord;
3use crate::SyncMode;
4use rustlite_core::{Error, Result};
5use std::fs::{File, OpenOptions};
6use std::io::{BufWriter, Write};
7use std::path::PathBuf;
8
9pub struct WalWriter {
10 file: BufWriter<File>,
11 current_segment: PathBuf,
12 current_size: u64,
13 max_segment_size: u64,
14 sync_mode: SyncMode,
15 sequence: u64,
16 wal_dir: PathBuf,
17}
18
19impl WalWriter {
20 pub fn new(wal_dir: &PathBuf, max_segment_size: u64, sync_mode: SyncMode) -> Result<Self> {
21 std::fs::create_dir_all(wal_dir)
23 .map_err(|e| Error::Storage(format!("Failed to create WAL directory: {}", e)))?;
24
25 let starting_sequence = Self::find_max_sequence(wal_dir)?;
27
28 let segment_name = format!("wal-{:016x}.log", starting_sequence);
30 let segment_path = wal_dir.join(&segment_name);
31
32 let file = OpenOptions::new()
34 .create(true)
35 .append(true)
36 .open(&segment_path)
37 .map_err(|e| Error::Storage(format!("Failed to open WAL segment: {}", e)))?;
38
39 let current_size = file.metadata().map(|m| m.len()).unwrap_or(0);
41
42 Ok(Self {
43 file: BufWriter::new(file),
44 current_segment: segment_path,
45 current_size,
46 max_segment_size,
47 sync_mode,
48 sequence: starting_sequence,
49 wal_dir: wal_dir.clone(),
50 })
51 }
52
53 fn find_max_sequence(wal_dir: &PathBuf) -> Result<u64> {
55 let mut max_seq = 0u64;
56
57 if let Ok(entries) = std::fs::read_dir(wal_dir) {
58 for entry in entries.flatten() {
59 if let Some(name) = entry.file_name().to_str() {
60 if name.starts_with("wal-") && name.ends_with(".log") {
61 if let Some(seq_str) = name
63 .strip_prefix("wal-")
64 .and_then(|s| s.strip_suffix(".log"))
65 {
66 if let Ok(seq) = u64::from_str_radix(seq_str, 16) {
67 max_seq = max_seq.max(seq);
68 }
69 }
70 }
71 }
72 }
73 }
74
75 Ok(max_seq)
76 }
77
78 pub fn append(&mut self, record: WalRecord) -> Result<u64> {
79 let encoded = record.encode()?;
81 let record_size = encoded.len() as u64;
82
83 if self.current_size + record_size > self.max_segment_size {
85 self.rotate_segment()?;
86 }
87
88 self.file
90 .write_all(&encoded)
91 .map_err(|e| Error::Storage(format!("Failed to write WAL record: {}", e)))?;
92
93 self.current_size += record_size;
94 self.sequence += 1;
95
96 if matches!(self.sync_mode, SyncMode::Sync) {
98 self.sync()?;
99 }
100
101 Ok(self.sequence)
102 }
103
104 pub fn sync(&mut self) -> Result<()> {
105 self.file
106 .flush()
107 .map_err(|e| Error::Storage(format!("Failed to flush WAL: {}", e)))?;
108
109 self.file
110 .get_ref()
111 .sync_all()
112 .map_err(|e| Error::Storage(format!("Failed to sync WAL: {}", e)))?;
113
114 Ok(())
115 }
116
117 fn rotate_segment(&mut self) -> Result<()> {
118 self.sync()?;
120
121 let segment_name = format!("wal-{:016x}.log", self.sequence + 1);
123 let new_segment = self.wal_dir.join(&segment_name);
124
125 let file = OpenOptions::new()
127 .create(true)
128 .append(true)
129 .open(&new_segment)
130 .map_err(|e| Error::Storage(format!("Failed to create new segment: {}", e)))?;
131
132 self.file = BufWriter::new(file);
134 self.current_segment = new_segment;
135 self.current_size = 0;
136
137 Ok(())
138 }
139
140 pub fn current_segment_path(&self) -> &PathBuf {
142 &self.current_segment
143 }
144
145 pub fn sequence(&self) -> u64 {
147 self.sequence
148 }
149
150 pub fn current_segment_size(&self) -> u64 {
152 self.current_size
153 }
154}
155
156impl Drop for WalWriter {
157 fn drop(&mut self) {
158 let _ = self.sync();
160 }
161}
162
163#[cfg(test)]
164mod tests {
165 use super::*;
166 use tempfile::TempDir;
167
168 fn setup_test_wal() -> (TempDir, PathBuf) {
169 let temp_dir = TempDir::new().expect("Failed to create temp dir");
170 let wal_path = temp_dir.path().join("wal");
171 std::fs::create_dir_all(&wal_path).expect("Failed to create WAL dir");
172 (temp_dir, wal_path)
173 }
174
175 #[test]
176 fn test_writer_creation() {
177 let (_temp_dir, wal_path) = setup_test_wal();
178
179 let writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
180 .expect("Failed to create writer");
181
182 assert!(writer.current_segment_path().exists());
183 assert_eq!(writer.sequence(), 0);
184 }
185
186 #[test]
187 fn test_append_single_record() {
188 let (_temp_dir, wal_path) = setup_test_wal();
189
190 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
191 .expect("Failed to create writer");
192
193 let record = WalRecord::put(b"key1".to_vec(), b"value1".to_vec());
194 let seq = writer.append(record).expect("Failed to append");
195
196 assert_eq!(seq, 1);
197 assert!(writer.current_segment_size() > 0);
198 }
199
200 #[test]
201 fn test_append_multiple_records() {
202 let (_temp_dir, wal_path) = setup_test_wal();
203
204 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
205 .expect("Failed to create writer");
206
207 for i in 0..10 {
208 let record = WalRecord::put(
209 format!("key{}", i).into_bytes(),
210 format!("value{}", i).into_bytes(),
211 );
212 let seq = writer.append(record).expect("Failed to append");
213 assert_eq!(seq, i as u64 + 1);
214 }
215 }
216
217 #[test]
218 fn test_segment_rotation() {
219 let (_temp_dir, wal_path) = setup_test_wal();
220
221 let mut writer =
223 WalWriter::new(&wal_path, 100, SyncMode::Sync).expect("Failed to create writer");
224
225 let initial_segment = writer.current_segment_path().clone();
226
227 for i in 0..10 {
229 let record = WalRecord::put(
230 format!("key{}", i).into_bytes(),
231 format!("value{}", i).into_bytes(),
232 );
233 writer.append(record).expect("Failed to append");
234 }
235
236 assert_ne!(writer.current_segment_path(), &initial_segment);
238
239 let segments: Vec<_> = std::fs::read_dir(&wal_path)
241 .expect("Failed to read dir")
242 .filter_map(|e| e.ok())
243 .filter(|e| {
244 e.path()
245 .extension()
246 .map(|ext| ext == "log")
247 .unwrap_or(false)
248 })
249 .collect();
250
251 assert!(
252 segments.len() > 1,
253 "Expected multiple segments after rotation"
254 );
255 }
256
257 #[test]
258 fn test_sync_modes() {
259 for sync_mode in [SyncMode::Sync, SyncMode::Async, SyncMode::None] {
260 let (_temp_dir, wal_path) = setup_test_wal();
261
262 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, sync_mode)
263 .expect("Failed to create writer");
264
265 let record = WalRecord::put(b"key".to_vec(), b"value".to_vec());
266 writer.append(record).expect("Failed to append");
267
268 writer.sync().expect("Failed to sync");
270 }
271 }
272
273 #[test]
274 fn test_writer_resume_sequence() {
275 let (_temp_dir, wal_path) = setup_test_wal();
276
277 {
279 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
280 .expect("Failed to create writer");
281
282 for i in 0..5 {
283 writer
284 .append(WalRecord::put(
285 format!("key{}", i).into_bytes(),
286 format!("value{}", i).into_bytes(),
287 ))
288 .expect("Failed to append");
289 }
290 }
291
292 let writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
294 .expect("Failed to create writer");
295
296 assert!(writer.current_segment_path().exists());
298 }
299
300 #[test]
301 fn test_different_record_types() {
302 let (_temp_dir, wal_path) = setup_test_wal();
303
304 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
305 .expect("Failed to create writer");
306
307 writer
309 .append(WalRecord::put(b"key1".to_vec(), b"value1".to_vec()))
310 .expect("Failed to append PUT");
311
312 writer
314 .append(WalRecord::delete(b"key2".to_vec()))
315 .expect("Failed to append DELETE");
316
317 writer
319 .append(WalRecord::begin_tx(1))
320 .expect("Failed to append BEGIN_TX");
321 writer
322 .append(WalRecord::commit_tx(1))
323 .expect("Failed to append COMMIT_TX");
324
325 writer
327 .append(WalRecord::checkpoint(100))
328 .expect("Failed to append CHECKPOINT");
329
330 assert_eq!(writer.sequence(), 5);
331 }
332
333 #[test]
334 fn test_large_record() {
335 let (_temp_dir, wal_path) = setup_test_wal();
336
337 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
338 .expect("Failed to create writer");
339
340 let large_value = vec![0u8; 1024 * 1024];
342 let record = WalRecord::put(b"large_key".to_vec(), large_value);
343
344 writer
345 .append(record)
346 .expect("Failed to append large record");
347
348 assert!(writer.current_segment_size() > 1024 * 1024);
349 }
350}