1use std::fs::File;
12use std::io::{Read, Seek, SeekFrom};
13
14use crate::wal::codec::{decode, HEADER_LEN, VERSION};
15use crate::wal::event::WalEvent;
16use crate::wal::reader::{WalReader, WalReaderError};
17use crate::wal::tail_validation::{WalCorruption, WalCorruptionReasonCode};
18
19#[derive(Debug, Clone, PartialEq, Eq)]
21pub enum WalFsReaderError {
22 IoError(String),
24}
25
26impl std::fmt::Display for WalFsReaderError {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 match self {
29 WalFsReaderError::IoError(e) => write!(f, "I/O error when opening WAL file: {e}"),
30 }
31 }
32}
33
34impl std::error::Error for WalFsReaderError {}
35
36impl std::convert::From<std::io::Error> for WalFsReaderError {
37 fn from(err: std::io::Error) -> Self {
38 WalFsReaderError::IoError(err.to_string())
39 }
40}
41
42enum ReadExactResult {
44 Ok,
46 Eof,
48 Partial,
50 IoError(String),
52}
53
54fn read_exact_or_eof(reader: &mut impl Read, buf: &mut [u8]) -> ReadExactResult {
59 let mut total_read = 0;
60 while total_read < buf.len() {
61 match reader.read(&mut buf[total_read..]) {
62 std::result::Result::Ok(0) => {
63 return if total_read == 0 {
64 ReadExactResult::Eof
65 } else {
66 ReadExactResult::Partial
67 };
68 }
69 std::result::Result::Ok(n) => total_read += n,
70 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
71 Err(e) => return ReadExactResult::IoError(e.to_string()),
72 }
73 }
74 ReadExactResult::Ok
75}
76
77pub struct WalFsReader {
89 file: File,
90 current_sequence: u64,
91 is_end: bool,
92 pending_event: Option<WalEvent>,
93}
94
95impl WalFsReader {
96 pub fn new(path: std::path::PathBuf) -> Result<Self, WalFsReaderError> {
110 let file = File::open(&path)?;
111
112 let file_len = file.metadata().map(|m| m.len()).unwrap_or(0);
114 let is_end = file_len == 0;
115
116 Ok(WalFsReader { file, current_sequence: 0, is_end, pending_event: None })
117 }
118
119 pub fn current_sequence(&self) -> u64 {
121 self.current_sequence
122 }
123
124 pub fn reset_eof(&mut self) {
127 self.is_end = false;
128 }
129
130 fn validate_full_file_strict(&mut self) -> Result<(), WalReaderError> {
133 let original_pos =
134 self.file.stream_position().map_err(|e| WalReaderError::IoError(e.to_string()))?;
135 let original_is_end = self.is_end;
136
137 self.file.seek(SeekFrom::Start(0)).map_err(|e| WalReaderError::IoError(e.to_string()))?;
138 self.is_end = false;
139
140 let result = loop {
141 match self.read_next_event_bytes() {
142 Ok(Some(_)) => continue,
143 Ok(None) => break Ok(()),
144 Err(e @ WalReaderError::Corruption(_)) => break Err(e),
145 Err(e) => break Err(e),
146 }
147 };
148
149 self.file
151 .seek(SeekFrom::Start(original_pos))
152 .map_err(|e| WalReaderError::IoError(e.to_string()))?;
153 self.is_end = original_is_end;
154
155 if let Err(e) = &result {
156 if matches!(e, WalReaderError::Corruption(_)) {
157 self.is_end = true;
158 }
159 }
160 result
161 }
162
163 fn read_next_event_bytes(&mut self) -> Result<Option<(WalEvent, usize)>, WalReaderError> {
174 let record_start =
175 self.file.stream_position().map_err(|e| WalReaderError::IoError(e.to_string()))?;
176
177 let mut header_buf = [0u8; HEADER_LEN];
179 let header_result = read_exact_or_eof(&mut self.file, &mut header_buf);
180 match header_result {
181 ReadExactResult::Ok => {}
182 ReadExactResult::Eof => {
183 self.is_end = true;
184 return Ok(None);
185 }
186 ReadExactResult::Partial => {
187 self.is_end = true;
188 return Err(WalReaderError::Corruption(WalCorruption {
189 offset: record_start,
190 reason: WalCorruptionReasonCode::IncompleteHeader,
191 }));
192 }
193 ReadExactResult::IoError(e) => return Err(WalReaderError::IoError(e)),
194 }
195
196 let version = u32::from_le_bytes(header_buf[0..4].try_into().unwrap());
198 let payload_len = u32::from_le_bytes(header_buf[4..8].try_into().unwrap()) as usize;
199
200 const MAX_REASONABLE_PAYLOAD: usize = 256 * 1024 * 1024; if payload_len > MAX_REASONABLE_PAYLOAD {
203 self.is_end = true;
204 return Err(WalReaderError::Corruption(WalCorruption {
205 offset: record_start,
206 reason: WalCorruptionReasonCode::DecodeFailure,
207 }));
208 }
209
210 if version != VERSION {
212 self.is_end = true;
213 return Err(WalReaderError::Corruption(WalCorruption {
214 offset: record_start,
215 reason: WalCorruptionReasonCode::UnsupportedVersion,
216 }));
217 }
218
219 let mut payload_buf = vec![0u8; payload_len];
221 match self.file.read_exact(&mut payload_buf) {
222 Ok(()) => {}
223 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
224 self.is_end = true;
225 return Err(WalReaderError::Corruption(WalCorruption {
226 offset: record_start,
227 reason: WalCorruptionReasonCode::IncompletePayload,
228 }));
229 }
230 Err(e) => return Err(WalReaderError::IoError(e.to_string())),
231 }
232
233 let total_len = HEADER_LEN + payload_len;
235 let mut record = Vec::with_capacity(total_len);
236 record.extend_from_slice(&header_buf);
237 record.extend_from_slice(&payload_buf);
238
239 match decode(&record) {
240 Ok(event) => Ok(Some((event, total_len))),
241 Err(crate::wal::codec::DecodeError::CrcMismatch { .. }) => {
242 self.is_end = true;
243 Err(WalReaderError::Corruption(WalCorruption {
244 offset: record_start,
245 reason: WalCorruptionReasonCode::CrcMismatch,
246 }))
247 }
248 Err(_) => {
249 self.is_end = true;
250 Err(WalReaderError::Corruption(WalCorruption {
251 offset: record_start,
252 reason: WalCorruptionReasonCode::DecodeFailure,
253 }))
254 }
255 }
256 }
257}
258
259impl WalReader for WalFsReader {
260 fn read_next(&mut self) -> Result<Option<WalEvent>, WalReaderError> {
275 if let Some(pending) = self.pending_event.take() {
277 self.current_sequence = pending.sequence();
278 return Ok(Some(pending));
279 }
280
281 if self.is_end {
282 return Ok(None);
283 }
284
285 match self.read_next_event_bytes() {
286 Ok(Some((event, _bytes_read))) => {
287 self.current_sequence = event.sequence();
288 Ok(Some(event))
289 }
290 Ok(None) => {
291 self.is_end = true;
292 Ok(None)
293 }
294 Err(e) => Err(e),
295 }
296 }
297
298 fn seek_to_sequence(&mut self, sequence: u64) -> Result<(), WalReaderError> {
313 self.validate_full_file_strict()?;
316
317 self.file.seek(SeekFrom::Start(0)).map_err(|e| WalReaderError::IoError(e.to_string()))?;
319
320 self.is_end = false;
321
322 while !self.is_end {
324 match self.read_next_event_bytes() {
325 Ok(Some((event, _))) => {
326 if event.sequence() == sequence {
327 self.pending_event = Some(event);
329 self.current_sequence = sequence;
330 return Ok(());
331 }
332 self.current_sequence = event.sequence();
333 }
334 Ok(None) => {
335 self.is_end = true;
337 return Err(WalReaderError::EndOfWal);
338 }
339 Err(e) => return Err(e),
340 }
341 }
342
343 Err(WalReaderError::EndOfWal)
344 }
345
346 fn is_end(&self) -> bool {
351 self.is_end
352 }
353}
354
355#[cfg(test)]
356mod tests {
357 use std::fs;
358 use std::io::Write;
359 use std::path::PathBuf;
360 use std::sync::atomic::{AtomicUsize, Ordering};
361
362 use super::*;
363
364 static TEST_COUNTER: AtomicUsize = AtomicUsize::new(0);
366
367 fn temp_wal_path() -> PathBuf {
368 let dir = std::env::temp_dir();
369 let count = TEST_COUNTER.fetch_add(1, Ordering::SeqCst);
370 let path = dir.join(format!("actionqueue_wal_reader_test_{count}.tmp"));
371 let _ = fs::remove_file(&path);
373 path
374 }
375
376 #[test]
377 fn test_new_returns_error_when_file_is_missing() {
378 let path = std::env::temp_dir().join(format!(
379 "actionqueue_wal_reader_missing_file_{}_{}",
380 std::process::id(),
381 TEST_COUNTER.fetch_add(1, Ordering::SeqCst)
382 ));
383 let _ = fs::remove_file(&path);
384
385 let result = WalFsReader::new(path);
386 assert!(matches!(result, Err(WalFsReaderError::IoError(_))));
387 }
388
389 fn create_test_event(seq: u64) -> WalEvent {
390 WalEvent::new(
391 seq,
392 crate::wal::event::WalEventType::TaskCreated {
393 task_spec: actionqueue_core::task::task_spec::TaskSpec::new(
394 actionqueue_core::ids::TaskId::new(),
395 actionqueue_core::task::task_spec::TaskPayload::with_content_type(
396 vec![1, 2, 3],
397 "application/octet-stream",
398 ),
399 actionqueue_core::task::run_policy::RunPolicy::Once,
400 actionqueue_core::task::constraints::TaskConstraints::default(),
401 actionqueue_core::task::metadata::TaskMetadata::default(),
402 )
403 .expect("test task spec should be valid"),
404 timestamp: 0,
405 },
406 )
407 }
408
409 #[test]
410 fn test_new_reader_on_empty_file() {
411 let path = temp_wal_path();
412 fs::write(&path, []).unwrap();
414
415 let mut reader = WalFsReader::new(path.clone())
416 .expect("Failed to open WAL file for test_new_reader_on_empty_file");
417
418 let result = reader.read_next();
420 assert!(matches!(result, Ok(None)));
421 assert!(reader.is_end());
422
423 let _ = fs::remove_file(path);
424 }
425
426 #[test]
427 fn test_read_next_returns_events_in_order() {
428 let path = temp_wal_path();
429
430 let mut writer = File::create(&path).unwrap();
432 let event1 = create_test_event(1);
433 let event2 = create_test_event(2);
434 let event3 = create_test_event(3);
435
436 writer
437 .write_all(&crate::wal::codec::encode(&event1).expect("encode should succeed"))
438 .unwrap();
439 writer
440 .write_all(&crate::wal::codec::encode(&event2).expect("encode should succeed"))
441 .unwrap();
442 writer
443 .write_all(&crate::wal::codec::encode(&event3).expect("encode should succeed"))
444 .unwrap();
445 writer.flush().unwrap();
446
447 let mut reader = WalFsReader::new(path.clone())
448 .expect("Failed to open WAL file for test_read_next_returns_events_in_order");
449
450 let e1 = reader.read_next().expect("First read should succeed");
452 assert!(e1.is_some());
453 assert_eq!(e1.as_ref().unwrap().sequence(), 1);
454
455 let e2 = reader.read_next().expect("Second read should succeed");
456 assert!(e2.is_some());
457 assert_eq!(e2.as_ref().unwrap().sequence(), 2);
458
459 let e3 = reader.read_next().expect("Third read should succeed");
460 assert!(e3.is_some());
461 assert_eq!(e3.as_ref().unwrap().sequence(), 3);
462
463 let e4 = reader.read_next().expect("Fourth read should return None");
465 assert!(e4.is_none());
466 assert!(reader.is_end());
467
468 let _ = fs::remove_file(path);
469 }
470
471 #[test]
472 fn test_partial_record_detected_at_end() {
473 let path = temp_wal_path();
474
475 let event1 = create_test_event(1);
477 let event1_bytes = crate::wal::codec::encode(&event1).expect("encode should succeed");
478
479 let mut writer = File::create(&path).unwrap();
481 writer.write_all(&event1_bytes).unwrap();
482
483 writer.write_all(&crate::wal::codec::VERSION.to_le_bytes()).unwrap();
486 writer.write_all(&4u32.to_le_bytes()).unwrap(); writer.write_all(&0u32.to_le_bytes()).unwrap(); writer.write_all(&[1u8, 2u8]).unwrap(); writer.flush().unwrap();
490
491 let mut reader = WalFsReader::new(path.clone())
492 .expect("Failed to open WAL file for test_partial_record_detected_at_end");
493
494 let e1 = reader.read_next().expect("First read should succeed");
496 assert!(e1.is_some());
497 assert_eq!(e1.as_ref().unwrap().sequence(), 1);
498
499 let e2 = reader.read_next();
501 assert!(matches!(e2, Err(WalReaderError::Corruption(_))));
502 assert!(reader.is_end());
503
504 let _ = fs::remove_file(path);
505 }
506
507 #[test]
508 fn test_seek_to_sequence() {
509 let path = temp_wal_path();
510
511 let mut writer = File::create(&path).unwrap();
513 let event1 = create_test_event(1);
514 let event2 = create_test_event(2);
515 let event3 = create_test_event(3);
516
517 writer
518 .write_all(&crate::wal::codec::encode(&event1).expect("encode should succeed"))
519 .unwrap();
520 writer
521 .write_all(&crate::wal::codec::encode(&event2).expect("encode should succeed"))
522 .unwrap();
523 writer
524 .write_all(&crate::wal::codec::encode(&event3).expect("encode should succeed"))
525 .unwrap();
526 writer.flush().unwrap();
527
528 let mut reader = WalFsReader::new(path.clone())
529 .expect("Failed to open WAL file for test_seek_to_sequence");
530
531 reader.seek_to_sequence(2).expect("Seek should succeed");
533
534 let next = reader.read_next().expect("Read after seek should succeed");
536 assert!(next.is_some());
537 assert_eq!(next.as_ref().unwrap().sequence(), 2);
538
539 let mut reader2 = WalFsReader::new(path.clone())
541 .expect("Failed to open WAL file for test_seek_to_sequence (second reader)");
542 let result = reader2.seek_to_sequence(999);
543 assert!(matches!(result, Err(WalReaderError::EndOfWal)));
544
545 let _ = fs::remove_file(path);
546 }
547
548 #[test]
549 fn test_current_sequence() {
550 let path = temp_wal_path();
551
552 let mut writer = File::create(&path).unwrap();
554 let event1 = create_test_event(42);
555 let event2 = create_test_event(43);
556
557 writer
558 .write_all(&crate::wal::codec::encode(&event1).expect("encode should succeed"))
559 .unwrap();
560 writer
561 .write_all(&crate::wal::codec::encode(&event2).expect("encode should succeed"))
562 .unwrap();
563 writer.flush().unwrap();
564
565 let mut reader = WalFsReader::new(path.clone())
566 .expect("Failed to open WAL file for test_current_sequence");
567
568 reader.read_next().expect("First read should succeed");
570 assert_eq!(reader.current_sequence(), 42);
571
572 reader.read_next().expect("Second read should succeed");
574 assert_eq!(reader.current_sequence(), 43);
575
576 let _ = fs::remove_file(path);
577 }
578
579 #[test]
580 fn test_reset_eof_allows_reading_appended_events() {
581 let path = temp_wal_path();
582
583 {
585 let mut writer = File::create(&path).unwrap();
586 let event1 = create_test_event(1);
587 let event2 = create_test_event(2);
588 writer
589 .write_all(&crate::wal::codec::encode(&event1).expect("encode should succeed"))
590 .unwrap();
591 writer
592 .write_all(&crate::wal::codec::encode(&event2).expect("encode should succeed"))
593 .unwrap();
594 writer.flush().unwrap();
595 }
596
597 let mut reader =
598 WalFsReader::new(path.clone()).expect("Failed to open WAL file for test_reset_eof");
599
600 assert!(reader.read_next().expect("read 1").is_some());
602 assert!(reader.read_next().expect("read 2").is_some());
603 assert!(reader.read_next().expect("read eof").is_none());
604 assert!(reader.is_end());
605
606 {
608 let mut writer = std::fs::OpenOptions::new().append(true).open(&path).unwrap();
609 let event3 = create_test_event(3);
610 writer
611 .write_all(&crate::wal::codec::encode(&event3).expect("encode should succeed"))
612 .unwrap();
613 writer.flush().unwrap();
614 }
615
616 assert!(reader.read_next().expect("still eof").is_none());
618
619 reader.reset_eof();
621 let event = reader.read_next().expect("read after reset").expect("should have event 3");
622 assert_eq!(event.sequence(), 3);
623
624 let _ = fs::remove_file(path);
625 }
626}