1use crate::record::WalRecord;
9use rustlite_core::{Error, Result};
10use std::fs::File;
11use std::io::{BufReader, Read};
12use std::path::{Path, PathBuf};
13
14pub struct WalReader {
16 segments: Vec<PathBuf>,
18 current_segment_index: usize,
20 reader: Option<BufReader<File>>,
22 current_offset: u64,
24}
25
26impl WalReader {
27 pub fn new(wal_dir: &Path) -> Result<Self> {
29 let segments = Self::discover_segments(wal_dir)?;
30
31 let mut reader = Self {
32 segments,
33 current_segment_index: 0,
34 reader: None,
35 current_offset: 0,
36 };
37
38 if !reader.segments.is_empty() {
40 reader.open_segment(0)?;
41 }
42
43 Ok(reader)
44 }
45
46 fn discover_segments(wal_dir: &Path) -> Result<Vec<PathBuf>> {
48 if !wal_dir.exists() {
49 return Ok(Vec::new());
50 }
51
52 let mut segments: Vec<PathBuf> = std::fs::read_dir(wal_dir)
53 .map_err(|e| Error::Storage(format!("Failed to read WAL directory: {}", e)))?
54 .filter_map(|entry| entry.ok())
55 .map(|entry| entry.path())
56 .filter(|path| {
57 path.extension()
58 .map(|ext| ext == "log")
59 .unwrap_or(false)
60 })
61 .collect();
62
63 segments.sort();
65
66 Ok(segments)
67 }
68
69 fn open_segment(&mut self, index: usize) -> Result<()> {
71 if index >= self.segments.len() {
72 return Err(Error::Storage("Segment index out of bounds".to_string()));
73 }
74
75 let path = &self.segments[index];
76 let file = File::open(path)
77 .map_err(|e| Error::Storage(format!("Failed to open segment {:?}: {}", path, e)))?;
78
79 self.reader = Some(BufReader::new(file));
80 self.current_segment_index = index;
81 self.current_offset = 0;
82
83 Ok(())
84 }
85
86 fn advance_segment(&mut self) -> Result<bool> {
88 let next_index = self.current_segment_index + 1;
89 if next_index >= self.segments.len() {
90 self.reader = None;
91 return Ok(false);
92 }
93
94 self.open_segment(next_index)?;
95 Ok(true)
96 }
97
98 pub fn next_record(&mut self) -> Result<Option<WalRecord>> {
104 loop {
105 let reader = match &mut self.reader {
106 Some(r) => r,
107 None => return Ok(None), };
109
110 match Self::read_record(reader) {
112 Ok(Some((record, bytes_read))) => {
113 self.current_offset += bytes_read as u64;
114 return Ok(Some(record));
115 }
116 Ok(None) => {
117 if !self.advance_segment()? {
119 return Ok(None);
120 }
121 }
123 Err(e) => {
124 if Self::is_truncation_error(&e) {
127 if !self.advance_segment()? {
129 return Ok(None);
130 }
131 } else {
133 return Err(e);
134 }
135 }
136 }
137 }
138 }
139
140 fn read_record(reader: &mut BufReader<File>) -> Result<Option<(WalRecord, usize)>> {
144 let mut len_buf = [0u8; 4];
146 match reader.read_exact(&mut len_buf) {
147 Ok(()) => {}
148 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
149 return Ok(None); }
151 Err(e) => {
152 return Err(Error::Storage(format!("Failed to read record length: {}", e)));
153 }
154 }
155
156 let content_len = u32::from_le_bytes(len_buf) as usize;
157
158 if content_len > 16 * 1024 * 1024 {
160 return Err(Error::Storage(format!(
161 "Record length too large: {} bytes",
162 content_len
163 )));
164 }
165
166 let total_data_len = content_len + 4; let mut data = vec![0u8; total_data_len];
169
170 reader.read_exact(&mut data).map_err(|e| {
171 if e.kind() == std::io::ErrorKind::UnexpectedEof {
172 Error::Serialization("Incomplete record: truncated".to_string())
173 } else {
174 Error::Storage(format!("Failed to read record data: {}", e))
175 }
176 })?;
177
178 let mut frame = Vec::with_capacity(4 + total_data_len);
180 frame.extend_from_slice(&len_buf);
181 frame.extend_from_slice(&data);
182
183 let (record, bytes_consumed) = WalRecord::decode(&frame)?;
185
186 Ok(Some((record, bytes_consumed)))
187 }
188
189 fn is_truncation_error(err: &Error) -> bool {
191 match err {
192 Error::Serialization(msg) => msg.contains("Incomplete") || msg.contains("truncated"),
193 _ => false,
194 }
195 }
196
197 pub fn segment_count(&self) -> usize {
199 self.segments.len()
200 }
201
202 pub fn current_segment(&self) -> usize {
204 self.current_segment_index
205 }
206
207 pub fn reset(&mut self) -> Result<()> {
209 if !self.segments.is_empty() {
210 self.open_segment(0)?;
211 } else {
212 self.reader = None;
213 self.current_segment_index = 0;
214 self.current_offset = 0;
215 }
216 Ok(())
217 }
218
219 pub fn seek_to_segment(&mut self, index: usize) -> Result<()> {
221 if index >= self.segments.len() {
222 return Err(Error::Storage(format!(
223 "Segment index {} out of range (have {} segments)",
224 index,
225 self.segments.len()
226 )));
227 }
228 self.open_segment(index)
229 }
230
231 pub fn read_all(&mut self) -> Result<Vec<WalRecord>> {
233 let mut records = Vec::new();
234 while let Some(record) = self.next_record()? {
235 records.push(record);
236 }
237 Ok(records)
238 }
239}
240
241impl Iterator for WalReader {
243 type Item = Result<WalRecord>;
244
245 fn next(&mut self) -> Option<Self::Item> {
246 match self.next_record() {
247 Ok(Some(record)) => Some(Ok(record)),
248 Ok(None) => None,
249 Err(e) => Some(Err(e)),
250 }
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257 use crate::{SyncMode, WalWriter};
258 use tempfile::TempDir;
259
260 fn setup_test_wal() -> (TempDir, PathBuf) {
261 let temp_dir = TempDir::new().expect("Failed to create temp dir");
262 let wal_path = temp_dir.path().join("wal");
263 std::fs::create_dir_all(&wal_path).expect("Failed to create WAL dir");
264 (temp_dir, wal_path)
265 }
266
267 #[test]
268 fn test_empty_wal_reader() {
269 let (_temp_dir, wal_path) = setup_test_wal();
270
271 let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
272 assert_eq!(reader.segment_count(), 0);
273 assert!(reader.next_record().unwrap().is_none());
274 }
275
276 #[test]
277 fn test_read_single_record() {
278 let (_temp_dir, wal_path) = setup_test_wal();
279
280 {
282 let mut writer =
283 WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync).expect("Failed to create writer");
284 let record = WalRecord::put(b"key1".to_vec(), b"value1".to_vec());
285 writer.append(record).expect("Failed to append");
286 writer.sync().expect("Failed to sync");
287 }
288
289 let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
291 assert_eq!(reader.segment_count(), 1);
292
293 let record = reader.next().unwrap().expect("Expected a record");
294 match &record.payload {
295 crate::record::RecordPayload::Put { key, value } => {
296 assert_eq!(key, b"key1");
297 assert_eq!(value, b"value1");
298 }
299 _ => panic!("Expected Put record"),
300 }
301
302 assert!(reader.next_record().unwrap().is_none());
303 }
304
305 #[test]
306 fn test_read_multiple_records() {
307 let (_temp_dir, wal_path) = setup_test_wal();
308
309 {
311 let mut writer =
312 WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync).expect("Failed to create writer");
313
314 for i in 0..10 {
315 let record = WalRecord::put(
316 format!("key{}", i).into_bytes(),
317 format!("value{}", i).into_bytes(),
318 );
319 writer.append(record).expect("Failed to append");
320 }
321 writer.sync().expect("Failed to sync");
322 }
323
324 let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
326 let records = reader.read_all().expect("Failed to read all");
327
328 assert_eq!(records.len(), 10);
329 }
330
331 #[test]
332 fn test_read_across_segment_rotation() {
333 let (_temp_dir, wal_path) = setup_test_wal();
334
335 {
337 let mut writer =
338 WalWriter::new(&wal_path, 100, SyncMode::Sync).expect("Failed to create writer");
339
340 for i in 0..20 {
341 let record = WalRecord::put(
342 format!("key{}", i).into_bytes(),
343 format!("value{}", i).into_bytes(),
344 );
345 writer.append(record).expect("Failed to append");
346 }
347 writer.sync().expect("Failed to sync");
348 }
349
350 let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
352 let records = reader.read_all().expect("Failed to read all");
353
354 assert_eq!(records.len(), 20);
355 assert!(reader.segment_count() > 1, "Expected multiple segments");
357 }
358
359 #[test]
360 fn test_reader_reset() {
361 let (_temp_dir, wal_path) = setup_test_wal();
362
363 {
365 let mut writer =
366 WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync).expect("Failed to create writer");
367
368 for i in 0..5 {
369 let record = WalRecord::put(
370 format!("key{}", i).into_bytes(),
371 format!("value{}", i).into_bytes(),
372 );
373 writer.append(record).expect("Failed to append");
374 }
375 writer.sync().expect("Failed to sync");
376 }
377
378 let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
379
380 let first_read = reader.read_all().expect("Failed to read all");
382 assert_eq!(first_read.len(), 5);
383
384 reader.reset().expect("Failed to reset");
386 let second_read = reader.read_all().expect("Failed to read all");
387 assert_eq!(second_read.len(), 5);
388 }
389
390 #[test]
391 fn test_reader_with_transaction_markers() {
392 let (_temp_dir, wal_path) = setup_test_wal();
393
394 {
396 let mut writer =
397 WalWriter::new(&wal_path, 64 * 1024 * 1024, SyncMode::Sync).expect("Failed to create writer");
398
399 writer.append(WalRecord::begin_tx(1)).expect("Failed to append");
400 writer
401 .append(WalRecord::put(b"key1".to_vec(), b"val1".to_vec()))
402 .expect("Failed to append");
403 writer
404 .append(WalRecord::put(b"key2".to_vec(), b"val2".to_vec()))
405 .expect("Failed to append");
406 writer.append(WalRecord::commit_tx(1)).expect("Failed to append");
407 writer.sync().expect("Failed to sync");
408 }
409
410 let mut reader = WalReader::new(&wal_path).expect("Failed to create reader");
411 let records = reader.read_all().expect("Failed to read all");
412
413 assert_eq!(records.len(), 4);
414 assert_eq!(records[0].record_type, crate::RecordType::BeginTx);
415 assert_eq!(records[1].record_type, crate::RecordType::Put);
416 assert_eq!(records[2].record_type, crate::RecordType::Put);
417 assert_eq!(records[3].record_type, crate::RecordType::CommitTx);
418 }
419}