1use crate::wal::codec::{decode, DecodeError, HEADER_LEN, VERSION};
7use crate::wal::event::WalEvent;
8
9const WAL_HEADER_LEN: usize = HEADER_LEN;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum WalCorruptionReasonCode {
14 IncompleteHeader,
16 IncompletePayload,
18 UnsupportedVersion,
20 DecodeFailure,
22 CrcMismatch,
24}
25
26impl std::fmt::Display for WalCorruptionReasonCode {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 match self {
29 WalCorruptionReasonCode::IncompleteHeader => write!(f, "incomplete_header"),
30 WalCorruptionReasonCode::IncompletePayload => write!(f, "incomplete_payload"),
31 WalCorruptionReasonCode::UnsupportedVersion => write!(f, "unsupported_version"),
32 WalCorruptionReasonCode::DecodeFailure => write!(f, "decode_failure"),
33 WalCorruptionReasonCode::CrcMismatch => write!(f, "crc_mismatch"),
34 }
35 }
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct WalCorruption {
41 pub offset: u64,
43 pub reason: WalCorruptionReasonCode,
45}
46
47impl WalCorruption {
48 pub fn new(offset: usize, reason: WalCorruptionReasonCode) -> Self {
50 Self { offset: offset as u64, reason }
51 }
52}
53
54impl std::fmt::Display for WalCorruption {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 write!(f, "WAL corruption at offset {} ({})", self.offset, self.reason)
57 }
58}
59
60impl std::error::Error for WalCorruption {}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64#[must_use]
65pub struct WalValidationSummary {
66 pub last_valid_sequence: u64,
68 pub end_offset: u64,
70}
71
72#[derive(Debug, Clone, PartialEq, Eq)]
74pub struct ParsedWalRecord {
75 pub event: WalEvent,
77 pub record_len: usize,
79}
80
81pub fn parse_record_at(
88 buffer: &[u8],
89 offset: usize,
90) -> Result<Option<ParsedWalRecord>, WalCorruption> {
91 if offset >= buffer.len() {
92 return Ok(None);
93 }
94
95 let remaining = buffer.len() - offset;
96 if remaining < WAL_HEADER_LEN {
97 return Err(WalCorruption::new(offset, WalCorruptionReasonCode::IncompleteHeader));
98 }
99
100 let version = u32::from_le_bytes(
101 buffer[offset..offset + 4]
102 .try_into()
103 .map_err(|_| WalCorruption::new(offset, WalCorruptionReasonCode::IncompleteHeader))?,
104 );
105 let payload_len = u32::from_le_bytes(
106 buffer[offset + 4..offset + 8]
107 .try_into()
108 .map_err(|_| WalCorruption::new(offset, WalCorruptionReasonCode::IncompleteHeader))?,
109 ) as usize;
110
111 let total_len = WAL_HEADER_LEN
112 .checked_add(payload_len)
113 .ok_or_else(|| WalCorruption::new(offset, WalCorruptionReasonCode::DecodeFailure))?;
114
115 let record_end = offset
116 .checked_add(total_len)
117 .ok_or_else(|| WalCorruption::new(offset, WalCorruptionReasonCode::DecodeFailure))?;
118 if record_end > buffer.len() {
119 return Err(WalCorruption::new(offset, WalCorruptionReasonCode::IncompletePayload));
120 }
121
122 if version != VERSION {
123 return Err(WalCorruption::new(offset, WalCorruptionReasonCode::UnsupportedVersion));
124 }
125
126 let event =
127 decode(&buffer[offset..offset + total_len]).map_err(|decode_error| match decode_error {
128 DecodeError::UnsupportedVersion(_) => {
129 WalCorruption::new(offset, WalCorruptionReasonCode::UnsupportedVersion)
130 }
131 DecodeError::CrcMismatch { .. } => {
132 WalCorruption::new(offset, WalCorruptionReasonCode::CrcMismatch)
133 }
134 DecodeError::InvalidLength(_) | DecodeError::Decode(_) => {
135 WalCorruption::new(offset, WalCorruptionReasonCode::DecodeFailure)
136 }
137 })?;
138
139 Ok(Some(ParsedWalRecord { event, record_len: total_len }))
140}
141
142pub fn validate_tail_strict(buffer: &[u8]) -> Result<WalValidationSummary, WalCorruption> {
147 let mut offset = 0usize;
148 let mut last_valid_sequence = 0u64;
149
150 while let Some(record) = parse_record_at(buffer, offset)? {
151 if record.event.sequence() > last_valid_sequence {
152 last_valid_sequence = record.event.sequence();
153 }
154 offset += record.record_len;
155 }
156
157 Ok(WalValidationSummary { last_valid_sequence, end_offset: offset as u64 })
158}
159
160#[derive(Debug, Clone, PartialEq, Eq)]
162#[must_use]
163pub struct LenientValidationResult {
164 pub last_valid_sequence: u64,
166 pub last_valid_offset: u64,
168 pub trailing_corruption: Option<WalCorruption>,
170}
171
172pub fn validate_tail_lenient(buffer: &[u8]) -> LenientValidationResult {
182 let mut offset = 0usize;
183 let mut last_valid_sequence = 0u64;
184
185 loop {
186 match parse_record_at(buffer, offset) {
187 Ok(None) => {
188 break;
190 }
191 Ok(Some(record)) => {
192 if record.event.sequence() > last_valid_sequence {
193 last_valid_sequence = record.event.sequence();
194 }
195 offset += record.record_len;
196 }
197 Err(corruption) => {
198 return LenientValidationResult {
200 last_valid_sequence,
201 last_valid_offset: offset as u64,
202 trailing_corruption: Some(corruption),
203 };
204 }
205 }
206 }
207
208 LenientValidationResult {
209 last_valid_sequence,
210 last_valid_offset: offset as u64,
211 trailing_corruption: None,
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use actionqueue_core::ids::TaskId;
218 use actionqueue_core::task::constraints::TaskConstraints;
219 use actionqueue_core::task::metadata::TaskMetadata;
220 use actionqueue_core::task::run_policy::RunPolicy;
221 use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
222
223 use super::*;
224 use crate::wal::codec;
225 use crate::wal::event::WalEventType;
226
227 fn event(seq: u64) -> WalEvent {
228 WalEvent::new(
229 seq,
230 WalEventType::TaskCreated {
231 task_spec: TaskSpec::new(
232 TaskId::new(),
233 TaskPayload::with_content_type(vec![1, 2, 3], "application/octet-stream"),
234 RunPolicy::Once,
235 TaskConstraints::default(),
236 TaskMetadata::default(),
237 )
238 .expect("valid test task"),
239 timestamp: 0,
240 },
241 )
242 }
243
244 #[test]
245 fn validate_tail_strict_accepts_clean_wal() {
246 let mut bytes = Vec::new();
247 bytes.extend_from_slice(&codec::encode(&event(5)).expect("encode should succeed"));
248 bytes.extend_from_slice(&codec::encode(&event(9)).expect("encode should succeed"));
249
250 let summary = validate_tail_strict(&bytes).expect("clean WAL should validate");
251 assert_eq!(summary.last_valid_sequence, 9);
252 assert_eq!(summary.end_offset, bytes.len() as u64);
253 }
254
255 #[test]
256 fn parse_record_at_reports_incomplete_header() {
257 let bytes = vec![1u8, 2u8, 3u8];
258 let error = parse_record_at(&bytes, 0).expect_err("partial header must fail");
259 assert_eq!(error.offset, 0);
260 assert_eq!(error.reason, WalCorruptionReasonCode::IncompleteHeader);
261 }
262
263 #[test]
264 fn parse_record_at_reports_incomplete_payload() {
265 let mut bytes = Vec::new();
266 bytes.extend_from_slice(&codec::VERSION.to_le_bytes());
267 bytes.extend_from_slice(&10u32.to_le_bytes()); bytes.extend_from_slice(&0u32.to_le_bytes()); bytes.extend_from_slice(&[0u8; 2]);
271
272 let error = parse_record_at(&bytes, 0).expect_err("partial payload must fail");
273 assert_eq!(error.offset, 0);
274 assert_eq!(error.reason, WalCorruptionReasonCode::IncompletePayload);
275 }
276
277 #[test]
278 fn parse_record_at_reports_unsupported_version() {
279 let mut bytes = codec::encode(&event(1)).expect("encode should succeed");
280 bytes[0..4].copy_from_slice(&999u32.to_le_bytes());
281
282 let error = parse_record_at(&bytes, 0).expect_err("unsupported version must fail");
283 assert_eq!(error.offset, 0);
284 assert_eq!(error.reason, WalCorruptionReasonCode::UnsupportedVersion);
285 }
286
287 #[test]
288 fn parse_record_at_reports_crc_mismatch() {
289 let payload = b"nope";
290 let mut bytes = Vec::new();
291 bytes.extend_from_slice(&codec::VERSION.to_le_bytes());
292 bytes.extend_from_slice(&(payload.len() as u32).to_le_bytes());
293 bytes.extend_from_slice(&0xDEADBEEFu32.to_le_bytes()); bytes.extend_from_slice(payload);
295
296 let error = parse_record_at(&bytes, 0).expect_err("CRC mismatch must fail");
297 assert_eq!(error.offset, 0);
298 assert_eq!(error.reason, WalCorruptionReasonCode::CrcMismatch);
299 }
300
301 #[test]
302 fn parse_record_at_reports_decode_failure() {
303 let payload = b"not valid postcard data";
304 let crc = crc32fast::hash(payload);
305 let mut bytes = Vec::new();
306 bytes.extend_from_slice(&codec::VERSION.to_le_bytes());
307 bytes.extend_from_slice(&(payload.len() as u32).to_le_bytes());
308 bytes.extend_from_slice(&crc.to_le_bytes()); bytes.extend_from_slice(payload);
310
311 let error = parse_record_at(&bytes, 0).expect_err("invalid payload must fail");
312 assert_eq!(error.offset, 0);
313 assert_eq!(error.reason, WalCorruptionReasonCode::DecodeFailure);
314 }
315}