use crate::wal::codec::{decode, DecodeError, HEADER_LEN, VERSION};
use crate::wal::event::WalEvent;
const WAL_HEADER_LEN: usize = HEADER_LEN;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WalCorruptionReasonCode {
IncompleteHeader,
IncompletePayload,
UnsupportedVersion,
DecodeFailure,
CrcMismatch,
}
impl std::fmt::Display for WalCorruptionReasonCode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WalCorruptionReasonCode::IncompleteHeader => write!(f, "incomplete_header"),
WalCorruptionReasonCode::IncompletePayload => write!(f, "incomplete_payload"),
WalCorruptionReasonCode::UnsupportedVersion => write!(f, "unsupported_version"),
WalCorruptionReasonCode::DecodeFailure => write!(f, "decode_failure"),
WalCorruptionReasonCode::CrcMismatch => write!(f, "crc_mismatch"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WalCorruption {
pub offset: u64,
pub reason: WalCorruptionReasonCode,
}
impl WalCorruption {
pub fn new(offset: usize, reason: WalCorruptionReasonCode) -> Self {
Self { offset: offset as u64, reason }
}
}
impl std::fmt::Display for WalCorruption {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "WAL corruption at offset {} ({})", self.offset, self.reason)
}
}
impl std::error::Error for WalCorruption {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[must_use]
pub struct WalValidationSummary {
pub last_valid_sequence: u64,
pub end_offset: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ParsedWalRecord {
pub event: WalEvent,
pub record_len: usize,
}
pub fn parse_record_at(
buffer: &[u8],
offset: usize,
) -> Result<Option<ParsedWalRecord>, WalCorruption> {
if offset >= buffer.len() {
return Ok(None);
}
let remaining = buffer.len() - offset;
if remaining < WAL_HEADER_LEN {
return Err(WalCorruption::new(offset, WalCorruptionReasonCode::IncompleteHeader));
}
let version = u32::from_le_bytes(
buffer[offset..offset + 4]
.try_into()
.map_err(|_| WalCorruption::new(offset, WalCorruptionReasonCode::IncompleteHeader))?,
);
let payload_len = u32::from_le_bytes(
buffer[offset + 4..offset + 8]
.try_into()
.map_err(|_| WalCorruption::new(offset, WalCorruptionReasonCode::IncompleteHeader))?,
) as usize;
let total_len = WAL_HEADER_LEN
.checked_add(payload_len)
.ok_or_else(|| WalCorruption::new(offset, WalCorruptionReasonCode::DecodeFailure))?;
let record_end = offset
.checked_add(total_len)
.ok_or_else(|| WalCorruption::new(offset, WalCorruptionReasonCode::DecodeFailure))?;
if record_end > buffer.len() {
return Err(WalCorruption::new(offset, WalCorruptionReasonCode::IncompletePayload));
}
if version != VERSION {
return Err(WalCorruption::new(offset, WalCorruptionReasonCode::UnsupportedVersion));
}
let event =
decode(&buffer[offset..offset + total_len]).map_err(|decode_error| match decode_error {
DecodeError::UnsupportedVersion(_) => {
WalCorruption::new(offset, WalCorruptionReasonCode::UnsupportedVersion)
}
DecodeError::CrcMismatch { .. } => {
WalCorruption::new(offset, WalCorruptionReasonCode::CrcMismatch)
}
DecodeError::InvalidLength(_) | DecodeError::Decode(_) => {
WalCorruption::new(offset, WalCorruptionReasonCode::DecodeFailure)
}
})?;
Ok(Some(ParsedWalRecord { event, record_len: total_len }))
}
pub fn validate_tail_strict(buffer: &[u8]) -> Result<WalValidationSummary, WalCorruption> {
let mut offset = 0usize;
let mut last_valid_sequence = 0u64;
while let Some(record) = parse_record_at(buffer, offset)? {
if record.event.sequence() > last_valid_sequence {
last_valid_sequence = record.event.sequence();
}
offset += record.record_len;
}
Ok(WalValidationSummary { last_valid_sequence, end_offset: offset as u64 })
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[must_use]
pub struct LenientValidationResult {
pub last_valid_sequence: u64,
pub last_valid_offset: u64,
pub trailing_corruption: Option<WalCorruption>,
}
pub fn validate_tail_lenient(buffer: &[u8]) -> LenientValidationResult {
let mut offset = 0usize;
let mut last_valid_sequence = 0u64;
loop {
match parse_record_at(buffer, offset) {
Ok(None) => {
break;
}
Ok(Some(record)) => {
if record.event.sequence() > last_valid_sequence {
last_valid_sequence = record.event.sequence();
}
offset += record.record_len;
}
Err(corruption) => {
return LenientValidationResult {
last_valid_sequence,
last_valid_offset: offset as u64,
trailing_corruption: Some(corruption),
};
}
}
}
LenientValidationResult {
last_valid_sequence,
last_valid_offset: offset as u64,
trailing_corruption: None,
}
}
#[cfg(test)]
mod tests {
use actionqueue_core::ids::TaskId;
use actionqueue_core::task::constraints::TaskConstraints;
use actionqueue_core::task::metadata::TaskMetadata;
use actionqueue_core::task::run_policy::RunPolicy;
use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
use super::*;
use crate::wal::codec;
use crate::wal::event::WalEventType;
fn event(seq: u64) -> WalEvent {
WalEvent::new(
seq,
WalEventType::TaskCreated {
task_spec: TaskSpec::new(
TaskId::new(),
TaskPayload::with_content_type(vec![1, 2, 3], "application/octet-stream"),
RunPolicy::Once,
TaskConstraints::default(),
TaskMetadata::default(),
)
.expect("valid test task"),
timestamp: 0,
},
)
}
#[test]
fn validate_tail_strict_accepts_clean_wal() {
let mut bytes = Vec::new();
bytes.extend_from_slice(&codec::encode(&event(5)).expect("encode should succeed"));
bytes.extend_from_slice(&codec::encode(&event(9)).expect("encode should succeed"));
let summary = validate_tail_strict(&bytes).expect("clean WAL should validate");
assert_eq!(summary.last_valid_sequence, 9);
assert_eq!(summary.end_offset, bytes.len() as u64);
}
#[test]
fn parse_record_at_reports_incomplete_header() {
let bytes = vec![1u8, 2u8, 3u8];
let error = parse_record_at(&bytes, 0).expect_err("partial header must fail");
assert_eq!(error.offset, 0);
assert_eq!(error.reason, WalCorruptionReasonCode::IncompleteHeader);
}
#[test]
fn parse_record_at_reports_incomplete_payload() {
let mut bytes = Vec::new();
bytes.extend_from_slice(&codec::VERSION.to_le_bytes());
bytes.extend_from_slice(&10u32.to_le_bytes()); bytes.extend_from_slice(&0u32.to_le_bytes()); bytes.extend_from_slice(&[0u8; 2]);
let error = parse_record_at(&bytes, 0).expect_err("partial payload must fail");
assert_eq!(error.offset, 0);
assert_eq!(error.reason, WalCorruptionReasonCode::IncompletePayload);
}
#[test]
fn parse_record_at_reports_unsupported_version() {
let mut bytes = codec::encode(&event(1)).expect("encode should succeed");
bytes[0..4].copy_from_slice(&999u32.to_le_bytes());
let error = parse_record_at(&bytes, 0).expect_err("unsupported version must fail");
assert_eq!(error.offset, 0);
assert_eq!(error.reason, WalCorruptionReasonCode::UnsupportedVersion);
}
#[test]
fn parse_record_at_reports_crc_mismatch() {
let payload = b"nope";
let mut bytes = Vec::new();
bytes.extend_from_slice(&codec::VERSION.to_le_bytes());
bytes.extend_from_slice(&(payload.len() as u32).to_le_bytes());
bytes.extend_from_slice(&0xDEADBEEFu32.to_le_bytes()); bytes.extend_from_slice(payload);
let error = parse_record_at(&bytes, 0).expect_err("CRC mismatch must fail");
assert_eq!(error.offset, 0);
assert_eq!(error.reason, WalCorruptionReasonCode::CrcMismatch);
}
#[test]
fn parse_record_at_reports_decode_failure() {
let payload = b"not valid postcard data";
let crc = crc32fast::hash(payload);
let mut bytes = Vec::new();
bytes.extend_from_slice(&codec::VERSION.to_le_bytes());
bytes.extend_from_slice(&(payload.len() as u32).to_le_bytes());
bytes.extend_from_slice(&crc.to_le_bytes()); bytes.extend_from_slice(payload);
let error = parse_record_at(&bytes, 0).expect_err("invalid payload must fail");
assert_eq!(error.offset, 0);
assert_eq!(error.reason, WalCorruptionReasonCode::DecodeFailure);
}
}