1use crate::record::WalRecord;
9use crate::writer::WalHeader;
10use rustlite_core::{Error, Result};
11use std::fs::File;
12use std::io::{BufReader, Read};
13use std::path::{Path, PathBuf};
14use tracing::debug;
15
16pub struct WalReader {
18 segments: Vec<PathBuf>,
20 current_segment_index: usize,
22 reader: Option<BufReader<File>>,
24 current_offset: u64,
26}
27
28impl WalReader {
29 pub fn new(wal_dir: &Path) -> Result<Self> {
31 let segments = Self::discover_segments(wal_dir)?;
32
33 let mut reader = Self {
34 segments,
35 current_segment_index: 0,
36 reader: None,
37 current_offset: 0,
38 };
39
40 if !reader.segments.is_empty() {
42 reader.open_segment(0)?;
43 }
44
45 Ok(reader)
46 }
47
48 fn discover_segments(wal_dir: &Path) -> Result<Vec<PathBuf>> {
50 if !wal_dir.exists() {
51 return Ok(Vec::new());
52 }
53
54 let mut segments: Vec<PathBuf> = std::fs::read_dir(wal_dir)
55 .map_err(|e| Error::Storage(format!("Failed to read WAL directory: {}", e)))?
56 .filter_map(|entry| entry.ok())
57 .map(|entry| entry.path())
58 .filter(|path| path.extension().map(|ext| ext == "log").unwrap_or(false))
59 .collect();
60
61 segments.sort();
63
64 Ok(segments)
65 }
66
67 fn open_segment(&mut self, index: usize) -> Result<()> {
69 if index >= self.segments.len() {
70 return Err(Error::Storage("Segment index out of bounds".to_string()));
71 }
72
73 let path = &self.segments[index];
74 let file = File::open(path)
75 .map_err(|e| Error::Storage(format!("Failed to open segment {:?}: {}", path, e)))?;
76
77 let mut reader = BufReader::new(file);
78
79 let header_offset = match WalHeader::read_from(&mut reader) {
82 Ok(header) => {
83 debug!(
84 segment = ?path,
85 version = header.version,
86 "Opened WAL segment with header"
87 );
88 WalHeader::SIZE as u64
89 }
90 Err(_) => {
91 let file = File::open(path).map_err(|e| {
93 Error::Storage(format!("Failed to reopen segment {:?}: {}", path, e))
94 })?;
95 reader = BufReader::new(file);
96 debug!(segment = ?path, "Opened legacy WAL segment (pre-v1.0)");
97 0
98 }
99 };
100
101 self.reader = Some(reader);
102 self.current_segment_index = index;
103 self.current_offset = header_offset;
104
105 Ok(())
106 }
107
108 fn advance_segment(&mut self) -> Result<bool> {
110 let next_index = self.current_segment_index + 1;
111 if next_index >= self.segments.len() {
112 self.reader = None;
113 return Ok(false);
114 }
115
116 self.open_segment(next_index)?;
117 Ok(true)
118 }
119
120 pub fn next_record(&mut self) -> Result<Option<WalRecord>> {
126 loop {
127 let reader = match &mut self.reader {
128 Some(r) => r,
129 None => return Ok(None), };
131
132 match Self::read_record(reader) {
134 Ok(Some((record, bytes_read))) => {
135 self.current_offset += bytes_read as u64;
136 return Ok(Some(record));
137 }
138 Ok(None) => {
139 if !self.advance_segment()? {
141 return Ok(None);
142 }
143 }
145 Err(e) => {
146 if Self::is_truncation_error(&e) {
149 if !self.advance_segment()? {
151 return Ok(None);
152 }
153 } else {
155 return Err(e);
156 }
157 }
158 }
159 }
160 }
161
162 fn read_record(reader: &mut BufReader<File>) -> Result<Option<(WalRecord, usize)>> {
166 let mut len_buf = [0u8; 4];
168 match reader.read_exact(&mut len_buf) {
169 Ok(()) => {}
170 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
171 return Ok(None); }
173 Err(e) => {
174 return Err(Error::Storage(format!(
175 "Failed to read record length: {}",
176 e
177 )));
178 }
179 }
180
181 let content_len = u32::from_le_bytes(len_buf) as usize;
182
183 if content_len > 16 * 1024 * 1024 {
185 return Err(Error::Storage(format!(
186 "Record length too large: {} bytes",
187 content_len
188 )));
189 }
190
191 let total_data_len = content_len + 4; let mut data = vec![0u8; total_data_len];
194
195 reader.read_exact(&mut data).map_err(|e| {
196 if e.kind() == std::io::ErrorKind::UnexpectedEof {
197 Error::Serialization("Incomplete record: truncated".to_string())
198 } else {
199 Error::Storage(format!("Failed to read record data: {}", e))
200 }
201 })?;
202
203 let mut frame = Vec::with_capacity(4 + total_data_len);
205 frame.extend_from_slice(&len_buf);
206 frame.extend_from_slice(&data);
207
208 let (record, bytes_consumed) = WalRecord::decode(&frame)?;
210
211 Ok(Some((record, bytes_consumed)))
212 }
213
214 fn is_truncation_error(err: &Error) -> bool {
216 match err {
217 Error::Serialization(msg) => msg.contains("Incomplete") || msg.contains("truncated"),
218 _ => false,
219 }
220 }
221
222 pub fn segment_count(&self) -> usize {
224 self.segments.len()
225 }
226
227 pub fn current_segment(&self) -> usize {
229 self.current_segment_index
230 }
231
232 pub fn reset(&mut self) -> Result<()> {
234 if !self.segments.is_empty() {
235 self.open_segment(0)?;
236 } else {
237 self.reader = None;
238 self.current_segment_index = 0;
239 self.current_offset = 0;
240 }
241 Ok(())
242 }
243
244 pub fn seek_to_segment(&mut self, index: usize) -> Result<()> {
246 if index >= self.segments.len() {
247 return Err(Error::Storage(format!(
248 "Segment index {} out of range (have {} segments)",
249 index,
250 self.segments.len()
251 )));
252 }
253 self.open_segment(index)
254 }
255
256 pub fn read_all(&mut self) -> Result<Vec<WalRecord>> {
258 let mut records = Vec::new();
259 while let Some(record) = self.next_record()? {
260 records.push(record);
261 }
262 Ok(records)
263 }
264}
265
266impl Iterator for WalReader {
268 type Item = Result<WalRecord>;
269
270 fn next(&mut self) -> Option<Self::Item> {
271 match self.next_record() {
272 Ok(Some(record)) => Some(Ok(record)),
273 Ok(None) => None,
274 Err(e) => Some(Err(e)),
275 }
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282 use crate::{SyncMode, WalWriter};
283 use tempfile::TempDir;
284
285 fn setup_test_wal() -> (TempDir, PathBuf) {
286 let temp_dir = TempDir::new().expect("Failed to create temp dir");
287 let wal_path = temp_dir.path().join("wal");
288 std::fs::create_dir_all(&wal_path).expect("Failed to create WAL dir");
289 (temp_dir, wal_path)
290 }
291
292 #[test]
293 fn test_empty_wal_reader() {
294 let (_temp_dir, wal_path) = setup_test_wal();
295
296 let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
297 assert_eq!(reader.segment_count(), 0);
298 assert!(reader.next_record().unwrap().is_none());
299 }
300
301 #[test]
302 fn test_read_single_record() {
303 let (_temp_dir, wal_path) = setup_test_wal();
304
305 {
307 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
308 .expect("Failed to create writer");
309 let record = WalRecord::put(b"key1".to_vec(), b"value1".to_vec());
310 writer.append(record).expect("Failed to append");
311 writer.sync().expect("Failed to sync");
312 }
313
314 let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
316 assert_eq!(reader.segment_count(), 1);
317
318 let record = reader.next().unwrap().expect("Expected a record");
319 match &record.payload {
320 crate::record::RecordPayload::Put { key, value } => {
321 assert_eq!(key, b"key1");
322 assert_eq!(value, b"value1");
323 }
324 _ => panic!("Expected Put record"),
325 }
326
327 assert!(reader.next_record().unwrap().is_none());
328 }
329
330 #[test]
331 fn test_read_multiple_records() {
332 let (_temp_dir, wal_path) = setup_test_wal();
333
334 {
336 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
337 .expect("Failed to create writer");
338
339 for i in 0..10 {
340 let record = WalRecord::put(
341 format!("key{}", i).into_bytes(),
342 format!("value{}", i).into_bytes(),
343 );
344 writer.append(record).expect("Failed to append");
345 }
346 writer.sync().expect("Failed to sync");
347 }
348
349 let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
351 let records = reader.read_all().expect("Failed to read all");
352
353 assert_eq!(records.len(), 10);
354 }
355
356 #[test]
357 fn test_read_across_segment_rotation() {
358 let (_temp_dir, wal_path) = setup_test_wal();
359
360 {
362 let mut writer =
363 WalWriter::new(&wal_path, 100, SyncMode::Sync).expect("Failed to create writer");
364
365 for i in 0..20 {
366 let record = WalRecord::put(
367 format!("key{}", i).into_bytes(),
368 format!("value{}", i).into_bytes(),
369 );
370 writer.append(record).expect("Failed to append");
371 }
372 writer.sync().expect("Failed to sync");
373 }
374
375 let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
377 let records = reader.read_all().expect("Failed to read all");
378
379 assert_eq!(records.len(), 20);
380 assert!(reader.segment_count() > 1, "Expected multiple segments");
382 }
383
384 #[test]
385 fn test_reader_reset() {
386 let (_temp_dir, wal_path) = setup_test_wal();
387
388 {
390 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
391 .expect("Failed to create writer");
392
393 for i in 0..5 {
394 let record = WalRecord::put(
395 format!("key{}", i).into_bytes(),
396 format!("value{}", i).into_bytes(),
397 );
398 writer.append(record).expect("Failed to append");
399 }
400 writer.sync().expect("Failed to sync");
401 }
402
403 let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
404
405 let first_read = reader.read_all().expect("Failed to read all");
407 assert_eq!(first_read.len(), 5);
408
409 reader.reset().expect("Failed to reset");
411 let second_read = reader.read_all().expect("Failed to read all");
412 assert_eq!(second_read.len(), 5);
413 }
414
415 #[test]
416 fn test_reader_with_transaction_markers() {
417 let (_temp_dir, wal_path) = setup_test_wal();
418
419 {
421 let mut writer = WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync)
422 .expect("Failed to create writer");
423
424 writer
425 .append(WalRecord::begin_tx(1))
426 .expect("Failed to append");
427 writer
428 .append(WalRecord::put(b"key1".to_vec(), b"val1".to_vec()))
429 .expect("Failed to append");
430 writer
431 .append(WalRecord::put(b"key2".to_vec(), b"val2".to_vec()))
432 .expect("Failed to append");
433 writer
434 .append(WalRecord::commit_tx(1))
435 .expect("Failed to append");
436 writer.sync().expect("Failed to sync");
437 }
438
439 let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
440 let records = reader.read_all().expect("Failed to read all");
441
442 assert_eq!(records.len(), 4);
443 assert_eq!(records[0].record_type, crate::RecordType::BeginTx);
444 assert_eq!(records[1].record_type, crate::RecordType::Put);
445 assert_eq!(records[2].record_type, crate::RecordType::Put);
446 assert_eq!(records[3].record_type, crate::RecordType::CommitTx);
447 }
448}