Skip to main content

actionqueue_storage/wal/
fs_reader.rs

1//! WAL file system reader.
2//!
3//! This module provides a file system backed WAL reader that sequentially reads
4//! events from a file using the encoded binary format from the codec module.
5//!
6//! The reader enforces strict framed-record integrity with one shared tail
7//! validator. Any corruption (including partial header, partial payload,
8//! unsupported version, or decode-invalid record) is surfaced as typed
9//! [`WalReaderError::Corruption`] details with record-start offsets.
10
11use std::fs::File;
12use std::io::{Read, Seek, SeekFrom};
13
14use crate::wal::codec::{decode, HEADER_LEN, VERSION};
15use crate::wal::event::WalEvent;
16use crate::wal::reader::{WalReader, WalReaderError};
17use crate::wal::tail_validation::{WalCorruption, WalCorruptionReasonCode};
18
19/// Errors that can occur when creating a [`WalFsReader`].
20#[derive(Debug, Clone, PartialEq, Eq)]
21pub enum WalFsReaderError {
22    /// I/O error when opening the file.
23    IoError(String),
24}
25
26impl std::fmt::Display for WalFsReaderError {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        match self {
29            WalFsReaderError::IoError(e) => write!(f, "I/O error when opening WAL file: {e}"),
30        }
31    }
32}
33
34impl std::error::Error for WalFsReaderError {}
35
36impl std::convert::From<std::io::Error> for WalFsReaderError {
37    fn from(err: std::io::Error) -> Self {
38        WalFsReaderError::IoError(err.to_string())
39    }
40}
41
42/// Result of a read-exact-or-eof operation.
43enum ReadExactResult {
44    /// All requested bytes were read successfully.
45    Ok,
46    /// Zero bytes available — clean end of file.
47    Eof,
48    /// Some bytes were read but not enough — partial/corrupt record.
49    Partial,
50    /// An I/O error other than EOF occurred.
51    IoError(String),
52}
53
54/// Reads exactly `buf.len()` bytes, distinguishing clean EOF from partial reads.
55///
56/// Unlike `read_exact` (which returns `UnexpectedEof` for both 0-bytes-available
57/// and partial reads), this function returns distinct results for each case.
58fn read_exact_or_eof(reader: &mut impl Read, buf: &mut [u8]) -> ReadExactResult {
59    let mut total_read = 0;
60    while total_read < buf.len() {
61        match reader.read(&mut buf[total_read..]) {
62            std::result::Result::Ok(0) => {
63                return if total_read == 0 {
64                    ReadExactResult::Eof
65                } else {
66                    ReadExactResult::Partial
67                };
68            }
69            std::result::Result::Ok(n) => total_read += n,
70            Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
71            Err(e) => return ReadExactResult::IoError(e.to_string()),
72        }
73    }
74    ReadExactResult::Ok
75}
76
77/// A file system backed WAL reader.
78///
79/// The reader opens a file and sequentially reads encoded WAL events.
80/// Each event is expected to follow the framed binary format from the codec:
81/// - 4 bytes version (currently 2)
82/// - 4 bytes length of payload
83/// - 4 bytes CRC-32 checksum
84/// - payload bytes
85///
86/// If strict framed validation finds corruption, the reader returns
87/// [`WalReaderError::Corruption`] with typed offset+reason diagnostics.
88pub struct WalFsReader {
89    file: File,
90    current_sequence: u64,
91    is_end: bool,
92    pending_event: Option<WalEvent>,
93}
94
95impl WalFsReader {
96    /// Creates a new WAL reader from the given path.
97    ///
98    /// Opens the file in read-only mode and positions the reader at the
99    /// beginning of the file. The reader will start reading from sequence
100    /// 0 (or the first event in the file).
101    ///
102    /// # Arguments
103    ///
104    /// * `path` - The filesystem path to the WAL file
105    ///
106    /// # Errors
107    ///
108    /// Returns `WalFsReaderError::IoError` if the file cannot be opened.
109    pub fn new(path: std::path::PathBuf) -> Result<Self, WalFsReaderError> {
110        let file = File::open(&path)?;
111
112        // Determine if file is empty by checking file size
113        let file_len = file.metadata().map(|m| m.len()).unwrap_or(0);
114        let is_end = file_len == 0;
115
116        Ok(WalFsReader { file, current_sequence: 0, is_end, pending_event: None })
117    }
118
119    /// Returns the current sequence number being read.
120    pub fn current_sequence(&self) -> u64 {
121        self.current_sequence
122    }
123
124    /// Resets the end-of-file flag, allowing the reader to attempt reading
125    /// new events that may have been appended after the previous EOF.
126    pub fn reset_eof(&mut self) {
127        self.is_end = false;
128    }
129
130    /// Strictly validates the full WAL from file start using streaming record-by-record
131    /// reads, then restores the prior cursor. Does not buffer the entire file in memory.
132    fn validate_full_file_strict(&mut self) -> Result<(), WalReaderError> {
133        let original_pos =
134            self.file.stream_position().map_err(|e| WalReaderError::IoError(e.to_string()))?;
135        let original_is_end = self.is_end;
136
137        self.file.seek(SeekFrom::Start(0)).map_err(|e| WalReaderError::IoError(e.to_string()))?;
138        self.is_end = false;
139
140        let result = loop {
141            match self.read_next_event_bytes() {
142                Ok(Some(_)) => continue,
143                Ok(None) => break Ok(()),
144                Err(e @ WalReaderError::Corruption(_)) => break Err(e),
145                Err(e) => break Err(e),
146            }
147        };
148
149        // Restore original position regardless of validation outcome
150        self.file
151            .seek(SeekFrom::Start(original_pos))
152            .map_err(|e| WalReaderError::IoError(e.to_string()))?;
153        self.is_end = original_is_end;
154
155        if let Err(e) = &result {
156            if matches!(e, WalReaderError::Corruption(_)) {
157                self.is_end = true;
158            }
159        }
160        result
161    }
162
163    /// Attempts to read the next event using streaming two-phase read.
164    ///
165    /// Phase 1: Read exactly `HEADER_LEN` (12) bytes for version + length + CRC.
166    /// Phase 2: Read exactly `payload_len` bytes for the payload.
167    ///
168    /// This avoids buffering the entire WAL tail on every read.
169    ///
170    /// Returns `Ok(Some((event, bytes_read)))` if a complete event was read,
171    /// `Ok(None)` if end of file was reached cleanly,
172    /// or `Err(WalReaderError::Corruption)` if strict corruption is detected.
173    fn read_next_event_bytes(&mut self) -> Result<Option<(WalEvent, usize)>, WalReaderError> {
174        let record_start =
175            self.file.stream_position().map_err(|e| WalReaderError::IoError(e.to_string()))?;
176
177        // Phase 1: Read the 12-byte header (version + length + CRC-32)
178        let mut header_buf = [0u8; HEADER_LEN];
179        let header_result = read_exact_or_eof(&mut self.file, &mut header_buf);
180        match header_result {
181            ReadExactResult::Ok => {}
182            ReadExactResult::Eof => {
183                self.is_end = true;
184                return Ok(None);
185            }
186            ReadExactResult::Partial => {
187                self.is_end = true;
188                return Err(WalReaderError::Corruption(WalCorruption {
189                    offset: record_start,
190                    reason: WalCorruptionReasonCode::IncompleteHeader,
191                }));
192            }
193            ReadExactResult::IoError(e) => return Err(WalReaderError::IoError(e)),
194        }
195
196        // Parse header fields
197        let version = u32::from_le_bytes(header_buf[0..4].try_into().unwrap());
198        let payload_len = u32::from_le_bytes(header_buf[4..8].try_into().unwrap()) as usize;
199
200        // Guard against unreasonable allocation from corrupted length field
201        const MAX_REASONABLE_PAYLOAD: usize = 256 * 1024 * 1024; // 256 MiB
202        if payload_len > MAX_REASONABLE_PAYLOAD {
203            self.is_end = true;
204            return Err(WalReaderError::Corruption(WalCorruption {
205                offset: record_start,
206                reason: WalCorruptionReasonCode::DecodeFailure,
207            }));
208        }
209
210        // Validate version before reading payload
211        if version != VERSION {
212            self.is_end = true;
213            return Err(WalReaderError::Corruption(WalCorruption {
214                offset: record_start,
215                reason: WalCorruptionReasonCode::UnsupportedVersion,
216            }));
217        }
218
219        // Phase 2: Read exactly payload_len bytes
220        let mut payload_buf = vec![0u8; payload_len];
221        match self.file.read_exact(&mut payload_buf) {
222            Ok(()) => {}
223            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
224                self.is_end = true;
225                return Err(WalReaderError::Corruption(WalCorruption {
226                    offset: record_start,
227                    reason: WalCorruptionReasonCode::IncompletePayload,
228                }));
229            }
230            Err(e) => return Err(WalReaderError::IoError(e.to_string())),
231        }
232
233        // Assemble full record and decode
234        let total_len = HEADER_LEN + payload_len;
235        let mut record = Vec::with_capacity(total_len);
236        record.extend_from_slice(&header_buf);
237        record.extend_from_slice(&payload_buf);
238
239        match decode(&record) {
240            Ok(event) => Ok(Some((event, total_len))),
241            Err(crate::wal::codec::DecodeError::CrcMismatch { .. }) => {
242                self.is_end = true;
243                Err(WalReaderError::Corruption(WalCorruption {
244                    offset: record_start,
245                    reason: WalCorruptionReasonCode::CrcMismatch,
246                }))
247            }
248            Err(_) => {
249                self.is_end = true;
250                Err(WalReaderError::Corruption(WalCorruption {
251                    offset: record_start,
252                    reason: WalCorruptionReasonCode::DecodeFailure,
253                }))
254            }
255        }
256    }
257}
258
259impl WalReader for WalFsReader {
260    /// Read the next event from the WAL.
261    ///
262    /// Attempts to read the next complete event from the file. If strict tail
263    /// validation detects corruption at or after the current cursor, returns
264    /// `WalReaderError::Corruption`.
265    ///
266    /// If the end of the file is reached with no corruption (all events
267    /// read), returns `Ok(None)`.
268    ///
269    /// # Errors
270    ///
271    /// Returns `WalReaderError::Corruption` if strict corruption is encountered.
272    ///
273    /// Returns `WalReaderError::IoError` if an I/O error occurs.
274    fn read_next(&mut self) -> Result<Option<WalEvent>, WalReaderError> {
275        // Check if we have a pending event from seek_to_sequence
276        if let Some(pending) = self.pending_event.take() {
277            self.current_sequence = pending.sequence();
278            return Ok(Some(pending));
279        }
280
281        if self.is_end {
282            return Ok(None);
283        }
284
285        match self.read_next_event_bytes() {
286            Ok(Some((event, _bytes_read))) => {
287                self.current_sequence = event.sequence();
288                Ok(Some(event))
289            }
290            Ok(None) => {
291                self.is_end = true;
292                Ok(None)
293            }
294            Err(e) => Err(e),
295        }
296    }
297
298    /// Seek to a specific sequence number.
299    ///
300    /// This implementation performs a linear scan from the beginning of the
301    /// file to find the event with the specified sequence number. This is
302    /// O(n) but guarantees correctness.
303    ///
304    /// If the sequence number is not found in the file, returns
305    /// `WalReaderError::EndOfWal`.
306    ///
307    /// # Errors
308    ///
309    /// Returns `WalReaderError::Corruption` if any corruption is detected
310    /// while scanning. Per strict policy this fails even when target sequence
311    /// appears before corruption.
312    fn seek_to_sequence(&mut self, sequence: u64) -> Result<(), WalReaderError> {
313        // Strict seek policy: fail on any corruption in the WAL, even when the
314        // target sequence appears before the corrupt tail.
315        self.validate_full_file_strict()?;
316
317        // Reset to beginning
318        self.file.seek(SeekFrom::Start(0)).map_err(|e| WalReaderError::IoError(e.to_string()))?;
319
320        self.is_end = false;
321
322        // Scan for the target sequence
323        while !self.is_end {
324            match self.read_next_event_bytes() {
325                Ok(Some((event, _))) => {
326                    if event.sequence() == sequence {
327                        // Found the target! Store it as pending so it's returned on next read
328                        self.pending_event = Some(event);
329                        self.current_sequence = sequence;
330                        return Ok(());
331                    }
332                    self.current_sequence = event.sequence();
333                }
334                Ok(None) => {
335                    // End of file without finding the sequence
336                    self.is_end = true;
337                    return Err(WalReaderError::EndOfWal);
338                }
339                Err(e) => return Err(e),
340            }
341        }
342
343        Err(WalReaderError::EndOfWal)
344    }
345
346    /// Returns true if the reader is at the end of the WAL.
347    ///
348    /// Returns true if all complete events have been read or if corruption was
349    /// encountered.
350    fn is_end(&self) -> bool {
351        self.is_end
352    }
353}
354
355#[cfg(test)]
356mod tests {
357    use std::fs;
358    use std::io::Write;
359    use std::path::PathBuf;
360    use std::sync::atomic::{AtomicUsize, Ordering};
361
362    use super::*;
363
364    // Atomic counter to ensure unique test file paths
365    static TEST_COUNTER: AtomicUsize = AtomicUsize::new(0);
366
367    fn temp_wal_path() -> PathBuf {
368        let dir = std::env::temp_dir();
369        let count = TEST_COUNTER.fetch_add(1, Ordering::SeqCst);
370        let path = dir.join(format!("actionqueue_wal_reader_test_{count}.tmp"));
371        // Clean up if exists from previous test runs
372        let _ = fs::remove_file(&path);
373        path
374    }
375
376    #[test]
377    fn test_new_returns_error_when_file_is_missing() {
378        let path = std::env::temp_dir().join(format!(
379            "actionqueue_wal_reader_missing_file_{}_{}",
380            std::process::id(),
381            TEST_COUNTER.fetch_add(1, Ordering::SeqCst)
382        ));
383        let _ = fs::remove_file(&path);
384
385        let result = WalFsReader::new(path);
386        assert!(matches!(result, Err(WalFsReaderError::IoError(_))));
387    }
388
389    fn create_test_event(seq: u64) -> WalEvent {
390        WalEvent::new(
391            seq,
392            crate::wal::event::WalEventType::TaskCreated {
393                task_spec: actionqueue_core::task::task_spec::TaskSpec::new(
394                    actionqueue_core::ids::TaskId::new(),
395                    actionqueue_core::task::task_spec::TaskPayload::with_content_type(
396                        vec![1, 2, 3],
397                        "application/octet-stream",
398                    ),
399                    actionqueue_core::task::run_policy::RunPolicy::Once,
400                    actionqueue_core::task::constraints::TaskConstraints::default(),
401                    actionqueue_core::task::metadata::TaskMetadata::default(),
402                )
403                .expect("test task spec should be valid"),
404                timestamp: 0,
405            },
406        )
407    }
408
409    #[test]
410    fn test_new_reader_on_empty_file() {
411        let path = temp_wal_path();
412        // Create empty file
413        fs::write(&path, []).unwrap();
414
415        let mut reader = WalFsReader::new(path.clone())
416            .expect("Failed to open WAL file for test_new_reader_on_empty_file");
417
418        // Should return None immediately on empty file
419        let result = reader.read_next();
420        assert!(matches!(result, Ok(None)));
421        assert!(reader.is_end());
422
423        let _ = fs::remove_file(path);
424    }
425
426    #[test]
427    fn test_read_next_returns_events_in_order() {
428        let path = temp_wal_path();
429
430        // Write some events first
431        let mut writer = File::create(&path).unwrap();
432        let event1 = create_test_event(1);
433        let event2 = create_test_event(2);
434        let event3 = create_test_event(3);
435
436        writer
437            .write_all(&crate::wal::codec::encode(&event1).expect("encode should succeed"))
438            .unwrap();
439        writer
440            .write_all(&crate::wal::codec::encode(&event2).expect("encode should succeed"))
441            .unwrap();
442        writer
443            .write_all(&crate::wal::codec::encode(&event3).expect("encode should succeed"))
444            .unwrap();
445        writer.flush().unwrap();
446
447        let mut reader = WalFsReader::new(path.clone())
448            .expect("Failed to open WAL file for test_read_next_returns_events_in_order");
449
450        // Read all events
451        let e1 = reader.read_next().expect("First read should succeed");
452        assert!(e1.is_some());
453        assert_eq!(e1.as_ref().unwrap().sequence(), 1);
454
455        let e2 = reader.read_next().expect("Second read should succeed");
456        assert!(e2.is_some());
457        assert_eq!(e2.as_ref().unwrap().sequence(), 2);
458
459        let e3 = reader.read_next().expect("Third read should succeed");
460        assert!(e3.is_some());
461        assert_eq!(e3.as_ref().unwrap().sequence(), 3);
462
463        // Should return None when at end
464        let e4 = reader.read_next().expect("Fourth read should return None");
465        assert!(e4.is_none());
466        assert!(reader.is_end());
467
468        let _ = fs::remove_file(path);
469    }
470
471    #[test]
472    fn test_partial_record_detected_at_end() {
473        let path = temp_wal_path();
474
475        // Create a file with a complete event followed by a partial record
476        let event1 = create_test_event(1);
477        let event1_bytes = crate::wal::codec::encode(&event1).expect("encode should succeed");
478
479        // Write complete event
480        let mut writer = File::create(&path).unwrap();
481        writer.write_all(&event1_bytes).unwrap();
482
483        // Write partial record: version (4B) + length (4B) + CRC (4B) + partial payload (2B)
484        // This should trigger a typed strict corruption error.
485        writer.write_all(&crate::wal::codec::VERSION.to_le_bytes()).unwrap();
486        writer.write_all(&4u32.to_le_bytes()).unwrap(); // length = 4
487        writer.write_all(&0u32.to_le_bytes()).unwrap(); // CRC (dummy)
488        writer.write_all(&[1u8, 2u8]).unwrap(); // only 2 bytes of 4 expected
489        writer.flush().unwrap();
490
491        let mut reader = WalFsReader::new(path.clone())
492            .expect("Failed to open WAL file for test_partial_record_detected_at_end");
493
494        // First event should read successfully
495        let e1 = reader.read_next().expect("First read should succeed");
496        assert!(e1.is_some());
497        assert_eq!(e1.as_ref().unwrap().sequence(), 1);
498
499        // Second read should return strict corruption details
500        let e2 = reader.read_next();
501        assert!(matches!(e2, Err(WalReaderError::Corruption(_))));
502        assert!(reader.is_end());
503
504        let _ = fs::remove_file(path);
505    }
506
507    #[test]
508    fn test_seek_to_sequence() {
509        let path = temp_wal_path();
510
511        // Write some events
512        let mut writer = File::create(&path).unwrap();
513        let event1 = create_test_event(1);
514        let event2 = create_test_event(2);
515        let event3 = create_test_event(3);
516
517        writer
518            .write_all(&crate::wal::codec::encode(&event1).expect("encode should succeed"))
519            .unwrap();
520        writer
521            .write_all(&crate::wal::codec::encode(&event2).expect("encode should succeed"))
522            .unwrap();
523        writer
524            .write_all(&crate::wal::codec::encode(&event3).expect("encode should succeed"))
525            .unwrap();
526        writer.flush().unwrap();
527
528        let mut reader = WalFsReader::new(path.clone())
529            .expect("Failed to open WAL file for test_seek_to_sequence");
530
531        // Seek to sequence 2
532        reader.seek_to_sequence(2).expect("Seek should succeed");
533
534        // Next read should start from sequence 2
535        let next = reader.read_next().expect("Read after seek should succeed");
536        assert!(next.is_some());
537        assert_eq!(next.as_ref().unwrap().sequence(), 2);
538
539        // Seek to non-existent sequence should return EndOfWal
540        let mut reader2 = WalFsReader::new(path.clone())
541            .expect("Failed to open WAL file for test_seek_to_sequence (second reader)");
542        let result = reader2.seek_to_sequence(999);
543        assert!(matches!(result, Err(WalReaderError::EndOfWal)));
544
545        let _ = fs::remove_file(path);
546    }
547
548    #[test]
549    fn test_current_sequence() {
550        let path = temp_wal_path();
551
552        // Write some events
553        let mut writer = File::create(&path).unwrap();
554        let event1 = create_test_event(42);
555        let event2 = create_test_event(43);
556
557        writer
558            .write_all(&crate::wal::codec::encode(&event1).expect("encode should succeed"))
559            .unwrap();
560        writer
561            .write_all(&crate::wal::codec::encode(&event2).expect("encode should succeed"))
562            .unwrap();
563        writer.flush().unwrap();
564
565        let mut reader = WalFsReader::new(path.clone())
566            .expect("Failed to open WAL file for test_current_sequence");
567
568        // After reading first event
569        reader.read_next().expect("First read should succeed");
570        assert_eq!(reader.current_sequence(), 42);
571
572        // After reading second event
573        reader.read_next().expect("Second read should succeed");
574        assert_eq!(reader.current_sequence(), 43);
575
576        let _ = fs::remove_file(path);
577    }
578
579    #[test]
580    fn test_reset_eof_allows_reading_appended_events() {
581        let path = temp_wal_path();
582
583        // Write 2 events
584        {
585            let mut writer = File::create(&path).unwrap();
586            let event1 = create_test_event(1);
587            let event2 = create_test_event(2);
588            writer
589                .write_all(&crate::wal::codec::encode(&event1).expect("encode should succeed"))
590                .unwrap();
591            writer
592                .write_all(&crate::wal::codec::encode(&event2).expect("encode should succeed"))
593                .unwrap();
594            writer.flush().unwrap();
595        }
596
597        let mut reader =
598            WalFsReader::new(path.clone()).expect("Failed to open WAL file for test_reset_eof");
599
600        // Read to EOF
601        assert!(reader.read_next().expect("read 1").is_some());
602        assert!(reader.read_next().expect("read 2").is_some());
603        assert!(reader.read_next().expect("read eof").is_none());
604        assert!(reader.is_end());
605
606        // Append a 3rd event externally
607        {
608            let mut writer = std::fs::OpenOptions::new().append(true).open(&path).unwrap();
609            let event3 = create_test_event(3);
610            writer
611                .write_all(&crate::wal::codec::encode(&event3).expect("encode should succeed"))
612                .unwrap();
613            writer.flush().unwrap();
614        }
615
616        // Without reset, read_next returns None
617        assert!(reader.read_next().expect("still eof").is_none());
618
619        // After reset_eof, can read new event
620        reader.reset_eof();
621        let event = reader.read_next().expect("read after reset").expect("should have event 3");
622        assert_eq!(event.sequence(), 3);
623
624        let _ = fs::remove_file(path);
625    }
626}