1use crate::record::WalRecord;
9use rustlite_core::{Error, Result};
10use std::fs::File;
11use std::io::{BufReader, Read};
12use std::path::{Path, PathBuf};
13
14pub struct WalReader {
16 segments: Vec<PathBuf>,
18 current_segment_index: usize,
20 reader: Option<BufReader<File>>,
22 current_offset: u64,
24}
25
26impl WalReader {
27 pub fn new(wal_dir: &Path) -> Result<Self> {
29 let segments = Self::discover_segments(wal_dir)?;
30
31 let mut reader = Self {
32 segments,
33 current_segment_index: 0,
34 reader: None,
35 current_offset: 0,
36 };
37
38 if !reader.segments.is_empty() {
40 reader.open_segment(0)?;
41 }
42
43 Ok(reader)
44 }
45
46 fn discover_segments(wal_dir: &Path) -> Result<Vec<PathBuf>> {
48 if !wal_dir.exists() {
49 return Ok(Vec::new());
50 }
51
52 let mut segments: Vec<PathBuf> = std::fs::read_dir(wal_dir)
53 .map_err(|e| Error::Storage(format!("Failed to read WAL directory: {}", e)))?
54 .filter_map(|entry| entry.ok())
55 .map(|entry| entry.path())
56 .filter(|path| path.extension().map(|ext| ext == "log").unwrap_or(false))
57 .collect();
58
59 segments.sort();
61
62 Ok(segments)
63 }
64
65 fn open_segment(&mut self, index: usize) -> Result<()> {
67 if index >= self.segments.len() {
68 return Err(Error::Storage("Segment index out of bounds".to_string()));
69 }
70
71 let path = &self.segments[index];
72 let file = File::open(path)
73 .map_err(|e| Error::Storage(format!("Failed to open segment {:?}: {}", path, e)))?;
74
75 self.reader = Some(BufReader::new(file));
76 self.current_segment_index = index;
77 self.current_offset = 0;
78
79 Ok(())
80 }
81
82 fn advance_segment(&mut self) -> Result<bool> {
84 let next_index = self.current_segment_index + 1;
85 if next_index >= self.segments.len() {
86 self.reader = None;
87 return Ok(false);
88 }
89
90 self.open_segment(next_index)?;
91 Ok(true)
92 }
93
94 pub fn next_record(&mut self) -> Result<Option<WalRecord>> {
100 loop {
101 let reader = match &mut self.reader {
102 Some(r) => r,
103 None => return Ok(None), };
105
106 match Self::read_record(reader) {
108 Ok(Some((record, bytes_read))) => {
109 self.current_offset += bytes_read as u64;
110 return Ok(Some(record));
111 }
112 Ok(None) => {
113 if !self.advance_segment()? {
115 return Ok(None);
116 }
117 }
119 Err(e) => {
120 if Self::is_truncation_error(&e) {
123 if !self.advance_segment()? {
125 return Ok(None);
126 }
127 } else {
129 return Err(e);
130 }
131 }
132 }
133 }
134 }
135
136 fn read_record(reader: &mut BufReader<File>) -> Result<Option<(WalRecord, usize)>> {
140 let mut len_buf = [0u8; 4];
142 match reader.read_exact(&mut len_buf) {
143 Ok(()) => {}
144 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
145 return Ok(None); }
147 Err(e) => {
148 return Err(Error::Storage(format!(
149 "Failed to read record length: {}",
150 e
151 )));
152 }
153 }
154
155 let content_len = u32::from_le_bytes(len_buf) as usize;
156
157 if content_len > 16 * 1024 * 1024 {
159 return Err(Error::Storage(format!(
160 "Record length too large: {} bytes",
161 content_len
162 )));
163 }
164
165 let total_data_len = content_len + 4; let mut data = vec![0u8; total_data_len];
168
169 reader.read_exact(&mut data).map_err(|e| {
170 if e.kind() == std::io::ErrorKind::UnexpectedEof {
171 Error::Serialization("Incomplete record: truncated".to_string())
172 } else {
173 Error::Storage(format!("Failed to read record data: {}", e))
174 }
175 })?;
176
177 let mut frame = Vec::with_capacity(4 + total_data_len);
179 frame.extend_from_slice(&len_buf);
180 frame.extend_from_slice(&data);
181
182 let (record, bytes_consumed) = WalRecord::decode(&frame)?;
184
185 Ok(Some((record, bytes_consumed)))
186 }
187
188 fn is_truncation_error(err: &Error) -> bool {
190 match err {
191 Error::Serialization(msg) => msg.contains("Incomplete") || msg.contains("truncated"),
192 _ => false,
193 }
194 }
195
196 pub fn segment_count(&self) -> usize {
198 self.segments.len()
199 }
200
201 pub fn current_segment(&self) -> usize {
203 self.current_segment_index
204 }
205
206 pub fn reset(&mut self) -> Result<()> {
208 if !self.segments.is_empty() {
209 self.open_segment(0)?;
210 } else {
211 self.reader = None;
212 self.current_segment_index = 0;
213 self.current_offset = 0;
214 }
215 Ok(())
216 }
217
218 pub fn seek_to_segment(&mut self, index: usize) -> Result<()> {
220 if index >= self.segments.len() {
221 return Err(Error::Storage(format!(
222 "Segment index {} out of range (have {} segments)",
223 index,
224 self.segments.len()
225 )));
226 }
227 self.open_segment(index)
228 }
229
230 pub fn read_all(&mut self) -> Result<Vec<WalRecord>> {
232 let mut records = Vec::new();
233 while let Some(record) = self.next_record()? {
234 records.push(record);
235 }
236 Ok(records)
237 }
238}
239
240impl Iterator for WalReader {
242 type Item = Result<WalRecord>;
243
244 fn next(&mut self) -> Option<Self::Item> {
245 match self.next_record() {
246 Ok(Some(record)) => Some(Ok(record)),
247 Ok(None) => None,
248 Err(e) => Some(Err(e)),
249 }
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use super::*;
256 use crate::{SyncMode, WalWriter};
257 use tempfile::TempDir;
258
259 fn setup_test_wal() -> (TempDir, PathBuf) {
260 let temp_dir = TempDir::new().expect("Failed to create temp dir");
261 let wal_path = temp_dir.path().join("wal");
262 std::fs::create_dir_all(&wal_path).expect("Failed to create WAL dir");
263 (temp_dir, wal_path)
264 }
265
266 #[test]
267 fn test_empty_wal_reader() {
268 let (_temp_dir, wal_path) = setup_test_wal();
269
270 let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
271 assert_eq!(reader.segment_count(), 0);
272 assert!(reader.next_record().unwrap().is_none());
273 }
274
275 #[test]
276 fn test_read_single_record() {
277 let (_temp_dir, wal_path) = setup_test_wal();
278
279 {
281 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
282 .expect("Failed to create writer");
283 let record = WalRecord::put(b"key1".to_vec(), b"value1".to_vec());
284 writer.append(record).expect("Failed to append");
285 writer.sync().expect("Failed to sync");
286 }
287
288 let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
290 assert_eq!(reader.segment_count(), 1);
291
292 let record = reader.next().unwrap().expect("Expected a record");
293 match &record.payload {
294 crate::record::RecordPayload::Put { key, value } => {
295 assert_eq!(key, b"key1");
296 assert_eq!(value, b"value1");
297 }
298 _ => panic!("Expected Put record"),
299 }
300
301 assert!(reader.next_record().unwrap().is_none());
302 }
303
304 #[test]
305 fn test_read_multiple_records() {
306 let (_temp_dir, wal_path) = setup_test_wal();
307
308 {
310 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
311 .expect("Failed to create writer");
312
313 for i in 0..10 {
314 let record = WalRecord::put(
315 format!("key{}", i).into_bytes(),
316 format!("value{}", i).into_bytes(),
317 );
318 writer.append(record).expect("Failed to append");
319 }
320 writer.sync().expect("Failed to sync");
321 }
322
323 let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
325 let records = reader.read_all().expect("Failed to read all");
326
327 assert_eq!(records.len(), 10);
328 }
329
330 #[test]
331 fn test_read_across_segment_rotation() {
332 let (_temp_dir, wal_path) = setup_test_wal();
333
334 {
336 let mut writer =
337 WalWriter::new(&wal_path, 100, SyncMode::Sync).expect("Failed to create writer");
338
339 for i in 0..20 {
340 let record = WalRecord::put(
341 format!("key{}", i).into_bytes(),
342 format!("value{}", i).into_bytes(),
343 );
344 writer.append(record).expect("Failed to append");
345 }
346 writer.sync().expect("Failed to sync");
347 }
348
349 let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
351 let records = reader.read_all().expect("Failed to read all");
352
353 assert_eq!(records.len(), 20);
354 assert!(reader.segment_count() > 1, "Expected multiple segments");
356 }
357
358 #[test]
359 fn test_reader_reset() {
360 let (_temp_dir, wal_path) = setup_test_wal();
361
362 {
364 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
365 .expect("Failed to create writer");
366
367 for i in 0..5 {
368 let record = WalRecord::put(
369 format!("key{}", i).into_bytes(),
370 format!("value{}", i).into_bytes(),
371 );
372 writer.append(record).expect("Failed to append");
373 }
374 writer.sync().expect("Failed to sync");
375 }
376
377 let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
378
379 let first_read = reader.read_all().expect("Failed to read all");
381 assert_eq!(first_read.len(), 5);
382
383 reader.reset().expect("Failed to reset");
385 let second_read = reader.read_all().expect("Failed to read all");
386 assert_eq!(second_read.len(), 5);
387 }
388
389 #[test]
390 fn test_reader_with_transaction_markers() {
391 let (_temp_dir, wal_path) = setup_test_wal();
392
393 {
395 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
396 .expect("Failed to create writer");
397
398 writer
399 .append(WalRecord::begin_tx(1))
400 .expect("Failed to append");
401 writer
402 .append(WalRecord::put(b"key1".to_vec(), b"val1".to_vec()))
403 .expect("Failed to append");
404 writer
405 .append(WalRecord::put(b"key2".to_vec(), b"val2".to_vec()))
406 .expect("Failed to append");
407 writer
408 .append(WalRecord::commit_tx(1))
409 .expect("Failed to append");
410 writer.sync().expect("Failed to sync");
411 }
412
413 let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
414 let records = reader.read_all().expect("Failed to read all");
415
416 assert_eq!(records.len(), 4);
417 assert_eq!(records[0].record_type, crate::RecordType::BeginTx);
418 assert_eq!(records[1].record_type, crate::RecordType::Put);
419 assert_eq!(records[2].record_type, crate::RecordType::Put);
420 assert_eq!(records[3].record_type, crate::RecordType::CommitTx);
421 }
422}