use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
use crate::wal::codec::{decode, HEADER_LEN, VERSION};
use crate::wal::event::WalEvent;
use crate::wal::reader::{WalReader, WalReaderError};
use crate::wal::tail_validation::{WalCorruption, WalCorruptionReasonCode};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WalFsReaderError {
IoError(String),
}
impl std::fmt::Display for WalFsReaderError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WalFsReaderError::IoError(e) => write!(f, "I/O error when opening WAL file: {e}"),
}
}
}
impl std::error::Error for WalFsReaderError {}
impl std::convert::From<std::io::Error> for WalFsReaderError {
fn from(err: std::io::Error) -> Self {
WalFsReaderError::IoError(err.to_string())
}
}
enum ReadExactResult {
Ok,
Eof,
Partial,
IoError(String),
}
fn read_exact_or_eof(reader: &mut impl Read, buf: &mut [u8]) -> ReadExactResult {
let mut total_read = 0;
while total_read < buf.len() {
match reader.read(&mut buf[total_read..]) {
std::result::Result::Ok(0) => {
return if total_read == 0 {
ReadExactResult::Eof
} else {
ReadExactResult::Partial
};
}
std::result::Result::Ok(n) => total_read += n,
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => return ReadExactResult::IoError(e.to_string()),
}
}
ReadExactResult::Ok
}
pub struct WalFsReader {
file: File,
current_sequence: u64,
is_end: bool,
pending_event: Option<WalEvent>,
}
impl WalFsReader {
pub fn new(path: std::path::PathBuf) -> Result<Self, WalFsReaderError> {
let file = File::open(&path)?;
let file_len = file.metadata().map(|m| m.len()).unwrap_or(0);
let is_end = file_len == 0;
Ok(WalFsReader { file, current_sequence: 0, is_end, pending_event: None })
}
pub fn current_sequence(&self) -> u64 {
self.current_sequence
}
pub fn reset_eof(&mut self) {
self.is_end = false;
}
fn validate_full_file_strict(&mut self) -> Result<(), WalReaderError> {
let original_pos =
self.file.stream_position().map_err(|e| WalReaderError::IoError(e.to_string()))?;
let original_is_end = self.is_end;
self.file.seek(SeekFrom::Start(0)).map_err(|e| WalReaderError::IoError(e.to_string()))?;
self.is_end = false;
let result = loop {
match self.read_next_event_bytes() {
Ok(Some(_)) => continue,
Ok(None) => break Ok(()),
Err(e @ WalReaderError::Corruption(_)) => break Err(e),
Err(e) => break Err(e),
}
};
self.file
.seek(SeekFrom::Start(original_pos))
.map_err(|e| WalReaderError::IoError(e.to_string()))?;
self.is_end = original_is_end;
if let Err(e) = &result {
if matches!(e, WalReaderError::Corruption(_)) {
self.is_end = true;
}
}
result
}
fn read_next_event_bytes(&mut self) -> Result<Option<(WalEvent, usize)>, WalReaderError> {
let record_start =
self.file.stream_position().map_err(|e| WalReaderError::IoError(e.to_string()))?;
let mut header_buf = [0u8; HEADER_LEN];
let header_result = read_exact_or_eof(&mut self.file, &mut header_buf);
match header_result {
ReadExactResult::Ok => {}
ReadExactResult::Eof => {
self.is_end = true;
return Ok(None);
}
ReadExactResult::Partial => {
self.is_end = true;
return Err(WalReaderError::Corruption(WalCorruption {
offset: record_start,
reason: WalCorruptionReasonCode::IncompleteHeader,
}));
}
ReadExactResult::IoError(e) => return Err(WalReaderError::IoError(e)),
}
let version = u32::from_le_bytes(header_buf[0..4].try_into().unwrap());
let payload_len = u32::from_le_bytes(header_buf[4..8].try_into().unwrap()) as usize;
const MAX_REASONABLE_PAYLOAD: usize = 256 * 1024 * 1024; if payload_len > MAX_REASONABLE_PAYLOAD {
self.is_end = true;
return Err(WalReaderError::Corruption(WalCorruption {
offset: record_start,
reason: WalCorruptionReasonCode::DecodeFailure,
}));
}
if version != VERSION {
self.is_end = true;
return Err(WalReaderError::Corruption(WalCorruption {
offset: record_start,
reason: WalCorruptionReasonCode::UnsupportedVersion,
}));
}
let mut payload_buf = vec![0u8; payload_len];
match self.file.read_exact(&mut payload_buf) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
self.is_end = true;
return Err(WalReaderError::Corruption(WalCorruption {
offset: record_start,
reason: WalCorruptionReasonCode::IncompletePayload,
}));
}
Err(e) => return Err(WalReaderError::IoError(e.to_string())),
}
let total_len = HEADER_LEN + payload_len;
let mut record = Vec::with_capacity(total_len);
record.extend_from_slice(&header_buf);
record.extend_from_slice(&payload_buf);
match decode(&record) {
Ok(event) => Ok(Some((event, total_len))),
Err(crate::wal::codec::DecodeError::CrcMismatch { .. }) => {
self.is_end = true;
Err(WalReaderError::Corruption(WalCorruption {
offset: record_start,
reason: WalCorruptionReasonCode::CrcMismatch,
}))
}
Err(_) => {
self.is_end = true;
Err(WalReaderError::Corruption(WalCorruption {
offset: record_start,
reason: WalCorruptionReasonCode::DecodeFailure,
}))
}
}
}
}
impl WalReader for WalFsReader {
fn read_next(&mut self) -> Result<Option<WalEvent>, WalReaderError> {
if let Some(pending) = self.pending_event.take() {
self.current_sequence = pending.sequence();
return Ok(Some(pending));
}
if self.is_end {
return Ok(None);
}
match self.read_next_event_bytes() {
Ok(Some((event, _bytes_read))) => {
self.current_sequence = event.sequence();
Ok(Some(event))
}
Ok(None) => {
self.is_end = true;
Ok(None)
}
Err(e) => Err(e),
}
}
fn seek_to_sequence(&mut self, sequence: u64) -> Result<(), WalReaderError> {
self.validate_full_file_strict()?;
self.file.seek(SeekFrom::Start(0)).map_err(|e| WalReaderError::IoError(e.to_string()))?;
self.is_end = false;
while !self.is_end {
match self.read_next_event_bytes() {
Ok(Some((event, _))) => {
if event.sequence() == sequence {
self.pending_event = Some(event);
self.current_sequence = sequence;
return Ok(());
}
self.current_sequence = event.sequence();
}
Ok(None) => {
self.is_end = true;
return Err(WalReaderError::EndOfWal);
}
Err(e) => return Err(e),
}
}
Err(WalReaderError::EndOfWal)
}
fn is_end(&self) -> bool {
self.is_end
}
}
#[cfg(test)]
mod tests {
use std::fs;
use std::io::Write;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use super::*;
static TEST_COUNTER: AtomicUsize = AtomicUsize::new(0);
fn temp_wal_path() -> PathBuf {
let dir = std::env::temp_dir();
let count = TEST_COUNTER.fetch_add(1, Ordering::SeqCst);
let path = dir.join(format!("actionqueue_wal_reader_test_{count}.tmp"));
let _ = fs::remove_file(&path);
path
}
#[test]
fn test_new_returns_error_when_file_is_missing() {
let path = std::env::temp_dir().join(format!(
"actionqueue_wal_reader_missing_file_{}_{}",
std::process::id(),
TEST_COUNTER.fetch_add(1, Ordering::SeqCst)
));
let _ = fs::remove_file(&path);
let result = WalFsReader::new(path);
assert!(matches!(result, Err(WalFsReaderError::IoError(_))));
}
fn create_test_event(seq: u64) -> WalEvent {
WalEvent::new(
seq,
crate::wal::event::WalEventType::TaskCreated {
task_spec: actionqueue_core::task::task_spec::TaskSpec::new(
actionqueue_core::ids::TaskId::new(),
actionqueue_core::task::task_spec::TaskPayload::with_content_type(
vec![1, 2, 3],
"application/octet-stream",
),
actionqueue_core::task::run_policy::RunPolicy::Once,
actionqueue_core::task::constraints::TaskConstraints::default(),
actionqueue_core::task::metadata::TaskMetadata::default(),
)
.expect("test task spec should be valid"),
timestamp: 0,
},
)
}
#[test]
fn test_new_reader_on_empty_file() {
let path = temp_wal_path();
fs::write(&path, []).unwrap();
let mut reader = WalFsReader::new(path.clone())
.expect("Failed to open WAL file for test_new_reader_on_empty_file");
let result = reader.read_next();
assert!(matches!(result, Ok(None)));
assert!(reader.is_end());
let _ = fs::remove_file(path);
}
#[test]
fn test_read_next_returns_events_in_order() {
let path = temp_wal_path();
let mut writer = File::create(&path).unwrap();
let event1 = create_test_event(1);
let event2 = create_test_event(2);
let event3 = create_test_event(3);
writer
.write_all(&crate::wal::codec::encode(&event1).expect("encode should succeed"))
.unwrap();
writer
.write_all(&crate::wal::codec::encode(&event2).expect("encode should succeed"))
.unwrap();
writer
.write_all(&crate::wal::codec::encode(&event3).expect("encode should succeed"))
.unwrap();
writer.flush().unwrap();
let mut reader = WalFsReader::new(path.clone())
.expect("Failed to open WAL file for test_read_next_returns_events_in_order");
let e1 = reader.read_next().expect("First read should succeed");
assert!(e1.is_some());
assert_eq!(e1.as_ref().unwrap().sequence(), 1);
let e2 = reader.read_next().expect("Second read should succeed");
assert!(e2.is_some());
assert_eq!(e2.as_ref().unwrap().sequence(), 2);
let e3 = reader.read_next().expect("Third read should succeed");
assert!(e3.is_some());
assert_eq!(e3.as_ref().unwrap().sequence(), 3);
let e4 = reader.read_next().expect("Fourth read should return None");
assert!(e4.is_none());
assert!(reader.is_end());
let _ = fs::remove_file(path);
}
#[test]
fn test_partial_record_detected_at_end() {
let path = temp_wal_path();
let event1 = create_test_event(1);
let event1_bytes = crate::wal::codec::encode(&event1).expect("encode should succeed");
let mut writer = File::create(&path).unwrap();
writer.write_all(&event1_bytes).unwrap();
writer.write_all(&crate::wal::codec::VERSION.to_le_bytes()).unwrap();
writer.write_all(&4u32.to_le_bytes()).unwrap(); writer.write_all(&0u32.to_le_bytes()).unwrap(); writer.write_all(&[1u8, 2u8]).unwrap(); writer.flush().unwrap();
let mut reader = WalFsReader::new(path.clone())
.expect("Failed to open WAL file for test_partial_record_detected_at_end");
let e1 = reader.read_next().expect("First read should succeed");
assert!(e1.is_some());
assert_eq!(e1.as_ref().unwrap().sequence(), 1);
let e2 = reader.read_next();
assert!(matches!(e2, Err(WalReaderError::Corruption(_))));
assert!(reader.is_end());
let _ = fs::remove_file(path);
}
#[test]
fn test_seek_to_sequence() {
let path = temp_wal_path();
let mut writer = File::create(&path).unwrap();
let event1 = create_test_event(1);
let event2 = create_test_event(2);
let event3 = create_test_event(3);
writer
.write_all(&crate::wal::codec::encode(&event1).expect("encode should succeed"))
.unwrap();
writer
.write_all(&crate::wal::codec::encode(&event2).expect("encode should succeed"))
.unwrap();
writer
.write_all(&crate::wal::codec::encode(&event3).expect("encode should succeed"))
.unwrap();
writer.flush().unwrap();
let mut reader = WalFsReader::new(path.clone())
.expect("Failed to open WAL file for test_seek_to_sequence");
reader.seek_to_sequence(2).expect("Seek should succeed");
let next = reader.read_next().expect("Read after seek should succeed");
assert!(next.is_some());
assert_eq!(next.as_ref().unwrap().sequence(), 2);
let mut reader2 = WalFsReader::new(path.clone())
.expect("Failed to open WAL file for test_seek_to_sequence (second reader)");
let result = reader2.seek_to_sequence(999);
assert!(matches!(result, Err(WalReaderError::EndOfWal)));
let _ = fs::remove_file(path);
}
#[test]
fn test_current_sequence() {
let path = temp_wal_path();
let mut writer = File::create(&path).unwrap();
let event1 = create_test_event(42);
let event2 = create_test_event(43);
writer
.write_all(&crate::wal::codec::encode(&event1).expect("encode should succeed"))
.unwrap();
writer
.write_all(&crate::wal::codec::encode(&event2).expect("encode should succeed"))
.unwrap();
writer.flush().unwrap();
let mut reader = WalFsReader::new(path.clone())
.expect("Failed to open WAL file for test_current_sequence");
reader.read_next().expect("First read should succeed");
assert_eq!(reader.current_sequence(), 42);
reader.read_next().expect("Second read should succeed");
assert_eq!(reader.current_sequence(), 43);
let _ = fs::remove_file(path);
}
#[test]
fn test_reset_eof_allows_reading_appended_events() {
let path = temp_wal_path();
{
let mut writer = File::create(&path).unwrap();
let event1 = create_test_event(1);
let event2 = create_test_event(2);
writer
.write_all(&crate::wal::codec::encode(&event1).expect("encode should succeed"))
.unwrap();
writer
.write_all(&crate::wal::codec::encode(&event2).expect("encode should succeed"))
.unwrap();
writer.flush().unwrap();
}
let mut reader =
WalFsReader::new(path.clone()).expect("Failed to open WAL file for test_reset_eof");
assert!(reader.read_next().expect("read 1").is_some());
assert!(reader.read_next().expect("read 2").is_some());
assert!(reader.read_next().expect("read eof").is_none());
assert!(reader.is_end());
{
let mut writer = std::fs::OpenOptions::new().append(true).open(&path).unwrap();
let event3 = create_test_event(3);
writer
.write_all(&crate::wal::codec::encode(&event3).expect("encode should succeed"))
.unwrap();
writer.flush().unwrap();
}
assert!(reader.read_next().expect("still eof").is_none());
reader.reset_eof();
let event = reader.read_next().expect("read after reset").expect("should have event 3");
assert_eq!(event.sequence(), 3);
let _ = fs::remove_file(path);
}
}