Skip to main content

actionqueue_storage/wal/
tail_validation.rs

1//! Shared strict WAL framed-record tail validation.
2//!
3//! This module defines the single source of truth for classifying trailing WAL
4//! corruption across writer bootstrap, reader iteration, and replay paths.
5
6use crate::wal::codec::{decode, DecodeError, HEADER_LEN, VERSION};
7use crate::wal::event::WalEvent;
8
9const WAL_HEADER_LEN: usize = HEADER_LEN;
10
11/// Stable, machine-readable corruption reason codes for WAL tail validation.
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum WalCorruptionReasonCode {
14    /// The WAL ended before a complete record header could be read.
15    IncompleteHeader,
16    /// The WAL ended before all bytes declared by the length frame were present.
17    IncompletePayload,
18    /// The framed record declares an unsupported WAL version.
19    UnsupportedVersion,
20    /// A complete framed record failed to decode as a valid [`WalEvent`].
21    DecodeFailure,
22    /// The CRC-32 checksum in the header does not match the payload.
23    CrcMismatch,
24}
25
26impl std::fmt::Display for WalCorruptionReasonCode {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        match self {
29            WalCorruptionReasonCode::IncompleteHeader => write!(f, "incomplete_header"),
30            WalCorruptionReasonCode::IncompletePayload => write!(f, "incomplete_payload"),
31            WalCorruptionReasonCode::UnsupportedVersion => write!(f, "unsupported_version"),
32            WalCorruptionReasonCode::DecodeFailure => write!(f, "decode_failure"),
33            WalCorruptionReasonCode::CrcMismatch => write!(f, "crc_mismatch"),
34        }
35    }
36}
37
38/// Typed WAL corruption details used by all restart paths.
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct WalCorruption {
41    /// Byte offset of the failing record start boundary.
42    pub offset: u64,
43    /// Stable machine-readable reason code.
44    pub reason: WalCorruptionReasonCode,
45}
46
47impl WalCorruption {
48    /// Creates typed corruption details at a record-start boundary.
49    pub fn new(offset: usize, reason: WalCorruptionReasonCode) -> Self {
50        Self { offset: offset as u64, reason }
51    }
52}
53
54impl std::fmt::Display for WalCorruption {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        write!(f, "WAL corruption at offset {} ({})", self.offset, self.reason)
57    }
58}
59
60impl std::error::Error for WalCorruption {}
61
62/// Summary emitted when WAL validation succeeds with no corruption.
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64#[must_use]
65pub struct WalValidationSummary {
66    /// Highest observed sequence in the WAL (or 0 for empty WAL).
67    pub last_valid_sequence: u64,
68    /// End offset after the last complete decoded record.
69    pub end_offset: u64,
70}
71
72/// A decoded framed WAL record returned by the shared parser.
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub struct ParsedWalRecord {
75    /// Decoded WAL event.
76    pub event: WalEvent,
77    /// Record byte length including framing header.
78    pub record_len: usize,
79}
80
81/// Parses one framed record at `offset`.
82///
83/// Returns:
84/// - `Ok(None)` when `offset` is exactly at EOF,
85/// - `Ok(Some(record))` for a complete decoded record,
86/// - `Err(WalCorruption)` for strict corruption classification at record boundary.
87pub fn parse_record_at(
88    buffer: &[u8],
89    offset: usize,
90) -> Result<Option<ParsedWalRecord>, WalCorruption> {
91    if offset >= buffer.len() {
92        return Ok(None);
93    }
94
95    let remaining = buffer.len() - offset;
96    if remaining < WAL_HEADER_LEN {
97        return Err(WalCorruption::new(offset, WalCorruptionReasonCode::IncompleteHeader));
98    }
99
100    let version = u32::from_le_bytes(
101        buffer[offset..offset + 4]
102            .try_into()
103            .map_err(|_| WalCorruption::new(offset, WalCorruptionReasonCode::IncompleteHeader))?,
104    );
105    let payload_len = u32::from_le_bytes(
106        buffer[offset + 4..offset + 8]
107            .try_into()
108            .map_err(|_| WalCorruption::new(offset, WalCorruptionReasonCode::IncompleteHeader))?,
109    ) as usize;
110
111    let total_len = WAL_HEADER_LEN
112        .checked_add(payload_len)
113        .ok_or_else(|| WalCorruption::new(offset, WalCorruptionReasonCode::DecodeFailure))?;
114
115    let record_end = offset
116        .checked_add(total_len)
117        .ok_or_else(|| WalCorruption::new(offset, WalCorruptionReasonCode::DecodeFailure))?;
118    if record_end > buffer.len() {
119        return Err(WalCorruption::new(offset, WalCorruptionReasonCode::IncompletePayload));
120    }
121
122    if version != VERSION {
123        return Err(WalCorruption::new(offset, WalCorruptionReasonCode::UnsupportedVersion));
124    }
125
126    let event =
127        decode(&buffer[offset..offset + total_len]).map_err(|decode_error| match decode_error {
128            DecodeError::UnsupportedVersion(_) => {
129                WalCorruption::new(offset, WalCorruptionReasonCode::UnsupportedVersion)
130            }
131            DecodeError::CrcMismatch { .. } => {
132                WalCorruption::new(offset, WalCorruptionReasonCode::CrcMismatch)
133            }
134            DecodeError::InvalidLength(_) | DecodeError::Decode(_) => {
135                WalCorruption::new(offset, WalCorruptionReasonCode::DecodeFailure)
136            }
137        })?;
138
139    Ok(Some(ParsedWalRecord { event, record_len: total_len }))
140}
141
142/// Strictly validates an entire WAL byte stream.
143///
144/// Validation succeeds only when all framed records are complete and decodable.
145/// Any trailing corruption fails with typed record-boundary diagnostics.
146pub fn validate_tail_strict(buffer: &[u8]) -> Result<WalValidationSummary, WalCorruption> {
147    let mut offset = 0usize;
148    let mut last_valid_sequence = 0u64;
149
150    while let Some(record) = parse_record_at(buffer, offset)? {
151        if record.event.sequence() > last_valid_sequence {
152            last_valid_sequence = record.event.sequence();
153        }
154        offset += record.record_len;
155    }
156
157    Ok(WalValidationSummary { last_valid_sequence, end_offset: offset as u64 })
158}
159
160/// Result of lenient WAL tail validation.
161#[derive(Debug, Clone, PartialEq, Eq)]
162#[must_use]
163pub struct LenientValidationResult {
164    /// Highest observed sequence in the last valid record (0 for empty WAL).
165    pub last_valid_sequence: u64,
166    /// Byte offset after the last complete, valid record.
167    pub last_valid_offset: u64,
168    /// If trailing corruption was found, details about it.
169    pub trailing_corruption: Option<WalCorruption>,
170}
171
172/// Leniently validates a WAL byte stream, reporting trailing corruption
173/// without failing.
174///
175/// Reads all complete, decodable records and reports the offset of the last
176/// valid record boundary. If trailing bytes cannot be parsed as a valid record,
177/// they are reported as `trailing_corruption` but the function still succeeds.
178///
179/// Mid-stream corruption (corruption followed by valid records) is NOT handled
180/// by this function — that would indicate a more serious integrity issue.
181pub fn validate_tail_lenient(buffer: &[u8]) -> LenientValidationResult {
182    let mut offset = 0usize;
183    let mut last_valid_sequence = 0u64;
184
185    loop {
186        match parse_record_at(buffer, offset) {
187            Ok(None) => {
188                // Clean EOF
189                break;
190            }
191            Ok(Some(record)) => {
192                if record.event.sequence() > last_valid_sequence {
193                    last_valid_sequence = record.event.sequence();
194                }
195                offset += record.record_len;
196            }
197            Err(corruption) => {
198                // Trailing corruption found
199                return LenientValidationResult {
200                    last_valid_sequence,
201                    last_valid_offset: offset as u64,
202                    trailing_corruption: Some(corruption),
203                };
204            }
205        }
206    }
207
208    LenientValidationResult {
209        last_valid_sequence,
210        last_valid_offset: offset as u64,
211        trailing_corruption: None,
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use actionqueue_core::ids::TaskId;
218    use actionqueue_core::task::constraints::TaskConstraints;
219    use actionqueue_core::task::metadata::TaskMetadata;
220    use actionqueue_core::task::run_policy::RunPolicy;
221    use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
222
223    use super::*;
224    use crate::wal::codec;
225    use crate::wal::event::WalEventType;
226
227    fn event(seq: u64) -> WalEvent {
228        WalEvent::new(
229            seq,
230            WalEventType::TaskCreated {
231                task_spec: TaskSpec::new(
232                    TaskId::new(),
233                    TaskPayload::with_content_type(vec![1, 2, 3], "application/octet-stream"),
234                    RunPolicy::Once,
235                    TaskConstraints::default(),
236                    TaskMetadata::default(),
237                )
238                .expect("valid test task"),
239                timestamp: 0,
240            },
241        )
242    }
243
244    #[test]
245    fn validate_tail_strict_accepts_clean_wal() {
246        let mut bytes = Vec::new();
247        bytes.extend_from_slice(&codec::encode(&event(5)).expect("encode should succeed"));
248        bytes.extend_from_slice(&codec::encode(&event(9)).expect("encode should succeed"));
249
250        let summary = validate_tail_strict(&bytes).expect("clean WAL should validate");
251        assert_eq!(summary.last_valid_sequence, 9);
252        assert_eq!(summary.end_offset, bytes.len() as u64);
253    }
254
255    #[test]
256    fn parse_record_at_reports_incomplete_header() {
257        let bytes = vec![1u8, 2u8, 3u8];
258        let error = parse_record_at(&bytes, 0).expect_err("partial header must fail");
259        assert_eq!(error.offset, 0);
260        assert_eq!(error.reason, WalCorruptionReasonCode::IncompleteHeader);
261    }
262
263    #[test]
264    fn parse_record_at_reports_incomplete_payload() {
265        let mut bytes = Vec::new();
266        bytes.extend_from_slice(&codec::VERSION.to_le_bytes());
267        bytes.extend_from_slice(&10u32.to_le_bytes()); // length = 10
268        bytes.extend_from_slice(&0u32.to_le_bytes()); // CRC (dummy)
269                                                      // Only 2 bytes of payload instead of 10
270        bytes.extend_from_slice(&[0u8; 2]);
271
272        let error = parse_record_at(&bytes, 0).expect_err("partial payload must fail");
273        assert_eq!(error.offset, 0);
274        assert_eq!(error.reason, WalCorruptionReasonCode::IncompletePayload);
275    }
276
277    #[test]
278    fn parse_record_at_reports_unsupported_version() {
279        let mut bytes = codec::encode(&event(1)).expect("encode should succeed");
280        bytes[0..4].copy_from_slice(&999u32.to_le_bytes());
281
282        let error = parse_record_at(&bytes, 0).expect_err("unsupported version must fail");
283        assert_eq!(error.offset, 0);
284        assert_eq!(error.reason, WalCorruptionReasonCode::UnsupportedVersion);
285    }
286
287    #[test]
288    fn parse_record_at_reports_crc_mismatch() {
289        let payload = b"nope";
290        let mut bytes = Vec::new();
291        bytes.extend_from_slice(&codec::VERSION.to_le_bytes());
292        bytes.extend_from_slice(&(payload.len() as u32).to_le_bytes());
293        bytes.extend_from_slice(&0xDEADBEEFu32.to_le_bytes()); // wrong CRC
294        bytes.extend_from_slice(payload);
295
296        let error = parse_record_at(&bytes, 0).expect_err("CRC mismatch must fail");
297        assert_eq!(error.offset, 0);
298        assert_eq!(error.reason, WalCorruptionReasonCode::CrcMismatch);
299    }
300
301    #[test]
302    fn parse_record_at_reports_decode_failure() {
303        let payload = b"not valid postcard data";
304        let crc = crc32fast::hash(payload);
305        let mut bytes = Vec::new();
306        bytes.extend_from_slice(&codec::VERSION.to_le_bytes());
307        bytes.extend_from_slice(&(payload.len() as u32).to_le_bytes());
308        bytes.extend_from_slice(&crc.to_le_bytes()); // valid CRC
309        bytes.extend_from_slice(payload);
310
311        let error = parse_record_at(&bytes, 0).expect_err("invalid payload must fail");
312        assert_eq!(error.offset, 0);
313        assert_eq!(error.reason, WalCorruptionReasonCode::DecodeFailure);
314    }
315}