Skip to main content

actionqueue_storage/wal/
codec.rs

1//! WAL encoding and decoding.
2//!
3//! This module defines the encode/decode contract for WAL events with deterministic ordering
4//! and version framing policy.
5//!
6//! # Format
7//!
8//! The WAL encoding uses a framed binary format with CRC-32 integrity checks:
9//!
10//! ```text
11//! +-------------+-------------+------------+------------------------+
12//! | Version (4B)| Length (4B) | CRC-32 (4B)| Serialized Event (N B) |
13//! +-------------+-------------+------------+------------------------+
14//! ```
15//!
16//! - **Version**: 4-byte LE unsigned integer indicating the format version (currently 5)
17//! - **Length**: 4-byte LE unsigned integer indicating the length of the serialized event
18//! - **CRC-32**: 4-byte LE CRC-32 checksum of the serialized event payload
19//! - **Serialized Event**: The postcard-encoded WalEvent structure
20//!
21//! This format ensures:
22//! - **Deterministic ordering**: The same event always produces the same bytes
23//! - **Version framing**: future format changes can be handled by checking the version
24//! - **Length prefixing**: allows efficient reading without backtracking
25//! - **Integrity**: CRC-32 detects corruption in the payload
26
27use crate::wal::event::WalEvent;
28
29/// The current WAL format version.
30pub const VERSION: u32 = 5;
31
32/// Size of the WAL record header in bytes (version + length + crc32).
33pub const HEADER_LEN: usize = 12;
34
35/// Maximum payload size that can be encoded in a WAL record.
36pub const MAX_PAYLOAD_SIZE: usize = u32::MAX as usize;
37
38/// Errors that can occur during WAL encoding.
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum EncodeError {
41    /// The event could not be serialized.
42    Serialization(String),
43    /// The serialized payload exceeds the maximum encodable size (u32::MAX).
44    PayloadTooLarge(usize),
45}
46
47impl std::fmt::Display for EncodeError {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        match self {
50            EncodeError::Serialization(msg) => write!(f, "WAL encode serialization error: {msg}"),
51            EncodeError::PayloadTooLarge(size) => {
52                write!(f, "WAL encode payload too large: {size} bytes (max {MAX_PAYLOAD_SIZE})")
53            }
54        }
55    }
56}
57
58impl std::error::Error for EncodeError {}
59
60/// Encodes a WAL event into bytes.
61///
62/// The encoding includes:
63/// - A 4-byte version header
64/// - A 4-byte length frame
65/// - A 4-byte CRC-32 checksum of the payload
66/// - The postcard-serialized event payload
67///
68/// # Determinism
69///
70/// This function produces deterministic output - the same event will always
71/// produce the same bytes. This is critical for replay and recovery.
72///
73/// # Errors
74///
75/// Returns [`EncodeError::Serialization`] if the event cannot be serialized.
76pub fn encode(event: &WalEvent) -> Result<Vec<u8>, EncodeError> {
77    // Serialize the event using postcard for compact binary representation
78    let payload =
79        postcard::to_allocvec(event).map_err(|e| EncodeError::Serialization(e.to_string()))?;
80
81    // Compute CRC-32 of the payload
82    let crc = crc32fast::hash(&payload);
83
84    let mut bytes = Vec::with_capacity(HEADER_LEN + payload.len());
85
86    // Write version frame (4 bytes, little-endian)
87    bytes.extend_from_slice(&VERSION.to_le_bytes());
88
89    // Write length frame (4 bytes, little-endian)
90    let payload_len =
91        u32::try_from(payload.len()).map_err(|_| EncodeError::PayloadTooLarge(payload.len()))?;
92    bytes.extend_from_slice(&payload_len.to_le_bytes());
93
94    // Write CRC-32 (4 bytes, little-endian)
95    bytes.extend_from_slice(&crc.to_le_bytes());
96
97    // Write payload
98    bytes.extend_from_slice(&payload);
99
100    Ok(bytes)
101}
102
103/// Decodes a WAL event from bytes.
104///
105/// # Format
106///
107/// See the module documentation for the expected byte format.
108///
109/// # Errors
110///
111/// Returns an error if:
112/// - The version frame doesn't match the expected version
113/// - The length frame exceeds reasonable bounds
114/// - The CRC-32 checksum doesn't match
115/// - The payload cannot be deserialized to WalEvent
116pub fn decode(bytes: &[u8]) -> Result<WalEvent, DecodeError> {
117    // Minimum bytes needed: 4 (version) + 4 (length) + 4 (crc) + 1 (minimum payload)
118    if bytes.len() < HEADER_LEN + 1 {
119        return Err(DecodeError::InvalidLength(format!(
120            "Buffer too short: {} bytes (minimum {} required)",
121            bytes.len(),
122            HEADER_LEN + 1
123        )));
124    }
125
126    // Read version frame
127    let version =
128        u32::from_le_bytes(bytes[0..4].try_into().map_err(|_| {
129            DecodeError::InvalidLength("version frame must be 4 bytes".to_string())
130        })?);
131
132    if version != VERSION {
133        return Err(DecodeError::UnsupportedVersion(format!(
134            "Unsupported WAL version: {version}. Current version: {VERSION}"
135        )));
136    }
137
138    // Read length frame
139    let length = u32::from_le_bytes(
140        bytes[4..8]
141            .try_into()
142            .map_err(|_| DecodeError::InvalidLength("length frame must be 4 bytes".to_string()))?,
143    ) as usize;
144
145    // Validate length against available bytes
146    if length > bytes.len() - HEADER_LEN {
147        return Err(DecodeError::InvalidLength(format!(
148            "Length frame indicates {} bytes but only {} available",
149            length,
150            bytes.len() - HEADER_LEN
151        )));
152    }
153
154    // Read expected CRC-32
155    let expected_crc = u32::from_le_bytes(
156        bytes[8..12]
157            .try_into()
158            .map_err(|_| DecodeError::InvalidLength("CRC frame must be 4 bytes".to_string()))?,
159    );
160
161    // Extract payload
162    let payload = &bytes[HEADER_LEN..HEADER_LEN + length];
163
164    // Validate CRC-32
165    let actual_crc = crc32fast::hash(payload);
166    if actual_crc != expected_crc {
167        return Err(DecodeError::CrcMismatch { expected: expected_crc, actual: actual_crc });
168    }
169
170    // Deserialize the event
171    postcard::from_bytes(payload)
172        .map_err(|e| DecodeError::Decode(format!("Failed to deserialize WalEvent: {e}")))
173}
174
175/// Errors that can occur during WAL decoding.
176#[derive(Debug, Clone, PartialEq, Eq)]
177pub enum DecodeError {
178    /// The version frame doesn't match the expected version
179    UnsupportedVersion(String),
180    /// The buffer length is invalid
181    InvalidLength(String),
182    /// The CRC-32 checksum does not match
183    CrcMismatch {
184        /// Expected CRC-32 value from header.
185        expected: u32,
186        /// Actual CRC-32 computed from payload.
187        actual: u32,
188    },
189    /// The payload could not be decoded
190    Decode(String),
191}
192
193impl std::fmt::Display for DecodeError {
194    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195        match self {
196            DecodeError::UnsupportedVersion(msg) => write!(f, "unsupported version: {msg}"),
197            DecodeError::InvalidLength(msg) => write!(f, "invalid length: {msg}"),
198            DecodeError::CrcMismatch { expected, actual } => {
199                write!(f, "CRC-32 mismatch: expected {expected:#010x}, actual {actual:#010x}")
200            }
201            DecodeError::Decode(msg) => write!(f, "decode error: {msg}"),
202        }
203    }
204}
205
206impl std::error::Error for DecodeError {}
207
208#[cfg(test)]
209mod tests {
210    use actionqueue_core::budget::BudgetDimension;
211    use actionqueue_core::ids::{AttemptId, RunId, TaskId};
212    use actionqueue_core::mutation::AttemptResultKind;
213    use actionqueue_core::run::state::RunState;
214    use actionqueue_core::subscription::{EventFilter, SubscriptionId};
215    use actionqueue_core::task::constraints::TaskConstraints;
216    use actionqueue_core::task::metadata::TaskMetadata;
217    use actionqueue_core::task::run_policy::RunPolicy;
218    use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
219
220    use super::*;
221    use crate::wal::event::WalEventType;
222
223    #[test]
224    fn encode_produces_versioned_output() {
225        let event = WalEvent::new(42, create_test_event());
226
227        let encoded = encode(&event).expect("encode should succeed");
228
229        // First 4 bytes should be the current VERSION (little-endian)
230        assert_eq!(&encoded[0..4], &VERSION.to_le_bytes());
231    }
232
233    #[test]
234    fn encode_length_frame_matches_payload() {
235        let event = WalEvent::new(100, create_test_event());
236
237        let encoded = encode(&event).expect("encode should succeed");
238
239        // Bytes 4-8 should be the length of the payload
240        let length = u32::from_le_bytes(encoded[4..8].try_into().unwrap()) as usize;
241        let expected_length = encoded.len() - HEADER_LEN;
242
243        assert_eq!(length, expected_length);
244    }
245
246    #[test]
247    fn encode_includes_crc32() {
248        let event = WalEvent::new(42, create_test_event());
249
250        let encoded = encode(&event).expect("encode should succeed");
251
252        // Extract CRC from header
253        let header_crc = u32::from_le_bytes(encoded[8..12].try_into().unwrap());
254
255        // Compute CRC from payload
256        let payload = &encoded[HEADER_LEN..];
257        let computed_crc = crc32fast::hash(payload);
258
259        assert_eq!(header_crc, computed_crc);
260    }
261
262    #[test]
263    fn decode_produces_equivalent_event() {
264        let original = WalEvent::new(123, create_test_event());
265
266        let encoded = encode(&original).expect("encode should succeed");
267        let decoded = decode(&encoded).expect("Failed to decode");
268
269        assert_eq!(original, decoded);
270    }
271
272    #[test]
273    fn encode_is_deterministic() {
274        let event = WalEvent::new(999, create_test_event());
275
276        let encoded1 = encode(&event).expect("encode should succeed");
277        let encoded2 = encode(&event).expect("encode should succeed");
278
279        assert_eq!(encoded1, encoded2);
280    }
281
282    #[test]
283    fn decode_rejects_invalid_version() {
284        let mut bytes = Vec::new();
285
286        // Write invalid version
287        bytes.extend_from_slice(&99u32.to_le_bytes());
288
289        // Write length
290        let payload = b"test";
291        bytes.extend_from_slice(&(payload.len() as u32).to_le_bytes());
292
293        // Write CRC
294        bytes.extend_from_slice(&crc32fast::hash(payload).to_le_bytes());
295
296        // Write payload
297        bytes.extend_from_slice(payload);
298
299        let result = decode(&bytes);
300
301        assert!(matches!(result, Err(DecodeError::UnsupportedVersion(_))));
302    }
303
304    #[test]
305    fn decode_rejects_buffer_too_short() {
306        let bytes = vec![0; 5]; // Less than minimum 13 bytes
307
308        let result = decode(&bytes);
309
310        assert!(matches!(result, Err(DecodeError::InvalidLength(_))));
311    }
312
313    #[test]
314    fn decode_rejects_length_exceeds_buffer() {
315        let mut bytes = Vec::new();
316
317        // Write version
318        bytes.extend_from_slice(&VERSION.to_le_bytes());
319
320        // Write length that exceeds available data
321        bytes.extend_from_slice(&1000u32.to_le_bytes());
322
323        // Write CRC (dummy)
324        bytes.extend_from_slice(&0u32.to_le_bytes());
325
326        // Write only 1 byte of payload
327        bytes.extend_from_slice(b"x");
328
329        let result = decode(&bytes);
330
331        assert!(matches!(result, Err(DecodeError::InvalidLength(_))));
332    }
333
334    #[test]
335    fn decode_rejects_crc_mismatch() {
336        let event = WalEvent::new(1, create_test_event());
337        let mut encoded = encode(&event).expect("encode should succeed");
338
339        // Corrupt a payload byte
340        if encoded.len() > HEADER_LEN {
341            encoded[HEADER_LEN] ^= 0xFF;
342        }
343
344        let result = decode(&encoded);
345        assert!(matches!(result, Err(DecodeError::CrcMismatch { .. })));
346    }
347
348    #[test]
349    fn decode_rejects_invalid_payload() {
350        let mut bytes = Vec::new();
351
352        let payload = b"NOT VALID POSTCARD";
353
354        // Write version
355        bytes.extend_from_slice(&VERSION.to_le_bytes());
356
357        // Write length
358        bytes.extend_from_slice(&(payload.len() as u32).to_le_bytes());
359
360        // Write valid CRC for the payload
361        bytes.extend_from_slice(&crc32fast::hash(payload).to_le_bytes());
362
363        // Write invalid payload
364        bytes.extend_from_slice(payload);
365
366        let result = decode(&bytes);
367
368        assert!(matches!(result, Err(DecodeError::Decode(_))));
369    }
370
371    #[test]
372    fn crc32_detects_single_bit_flip() {
373        let event = WalEvent::new(42, create_test_event());
374        let encoded = encode(&event).expect("encode should succeed");
375
376        // Flip a single bit in each payload byte
377        for i in HEADER_LEN..encoded.len() {
378            let mut corrupted = encoded.clone();
379            corrupted[i] ^= 0x01;
380            assert!(decode(&corrupted).is_err(), "single bit flip at byte {i} should be detected");
381        }
382    }
383
384    #[test]
385    fn roundtrip_all_event_types() {
386        let task_id = TaskId::new();
387        let run_id = RunId::new();
388        let attempt_id = AttemptId::new();
389        let task_spec = TaskSpec::new(
390            task_id,
391            TaskPayload::with_content_type(vec![1, 2, 3], "application/octet-stream"),
392            RunPolicy::Once,
393            TaskConstraints::default(),
394            TaskMetadata::default(),
395        )
396        .expect("valid test task");
397
398        let run_instance = actionqueue_core::run::run_instance::RunInstance::new_scheduled_with_id(
399            run_id, task_id, 100, 100,
400        )
401        .expect("valid scheduled run");
402
403        let events: Vec<WalEventType> = vec![
404            WalEventType::TaskCreated { task_spec, timestamp: 1000 },
405            WalEventType::RunCreated { run_instance },
406            WalEventType::RunStateChanged {
407                run_id,
408                previous_state: RunState::Scheduled,
409                new_state: RunState::Ready,
410                timestamp: 2000,
411            },
412            WalEventType::AttemptStarted { run_id, attempt_id, timestamp: 3000 },
413            WalEventType::AttemptFinished {
414                run_id,
415                attempt_id,
416                result: AttemptResultKind::Success,
417                error: None,
418                output: None,
419                timestamp: 4000,
420            },
421            WalEventType::AttemptFinished {
422                run_id,
423                attempt_id,
424                result: AttemptResultKind::Failure,
425                error: Some("test error".to_string()),
426                output: None,
427                timestamp: 4001,
428            },
429            WalEventType::TaskCanceled { task_id, timestamp: 5000 },
430            WalEventType::RunCanceled { run_id, timestamp: 5001 },
431            WalEventType::LeaseAcquired {
432                run_id,
433                owner: "worker-1".to_string(),
434                expiry: 9000,
435                timestamp: 6000,
436            },
437            WalEventType::LeaseHeartbeat {
438                run_id,
439                owner: "worker-1".to_string(),
440                expiry: 10000,
441                timestamp: 7000,
442            },
443            WalEventType::LeaseExpired {
444                run_id,
445                owner: "worker-1".to_string(),
446                expiry: 10000,
447                timestamp: 11000,
448            },
449            WalEventType::LeaseReleased {
450                run_id,
451                owner: "worker-1".to_string(),
452                expiry: 10000,
453                timestamp: 8000,
454            },
455            WalEventType::EnginePaused { timestamp: 12000 },
456            WalEventType::EngineResumed { timestamp: 13000 },
457            WalEventType::DependencyDeclared {
458                task_id,
459                depends_on: vec![TaskId::new(), TaskId::new()],
460                timestamp: 14000,
461            },
462            WalEventType::AttemptFinished {
463                run_id,
464                attempt_id,
465                result: AttemptResultKind::Success,
466                error: None,
467                output: Some(b"test-output".to_vec()),
468                timestamp: 15000,
469            },
470            WalEventType::RunSuspended {
471                run_id,
472                reason: Some("budget exhausted".to_string()),
473                timestamp: 16000,
474            },
475            WalEventType::RunResumed { run_id, timestamp: 17000 },
476            WalEventType::BudgetAllocated {
477                task_id,
478                dimension: BudgetDimension::Token,
479                limit: 1000,
480                timestamp: 18000,
481            },
482            WalEventType::BudgetConsumed {
483                task_id,
484                dimension: BudgetDimension::CostCents,
485                amount: 250,
486                timestamp: 19000,
487            },
488            WalEventType::BudgetExhausted {
489                task_id,
490                dimension: BudgetDimension::TimeSecs,
491                timestamp: 20000,
492            },
493            WalEventType::BudgetReplenished {
494                task_id,
495                dimension: BudgetDimension::Token,
496                new_limit: 2000,
497                timestamp: 21000,
498            },
499            WalEventType::SubscriptionCreated {
500                subscription_id: SubscriptionId::new(),
501                task_id,
502                filter: EventFilter::TaskCompleted { task_id },
503                timestamp: 22000,
504            },
505            WalEventType::SubscriptionTriggered {
506                subscription_id: SubscriptionId::new(),
507                timestamp: 23000,
508            },
509            WalEventType::SubscriptionCanceled {
510                subscription_id: SubscriptionId::new(),
511                timestamp: 24000,
512            },
513        ];
514
515        for (seq, event_type) in events.into_iter().enumerate() {
516            let original = WalEvent::new((seq + 1) as u64, event_type);
517            let encoded = encode(&original).expect("encode should succeed");
518            let decoded = decode(&encoded).expect("decode should succeed");
519            assert_eq!(original, decoded, "roundtrip failed for event at sequence {}", seq + 1);
520        }
521    }
522
523    /// Payload size validation is a compile-time guard: postcard cannot serialize
524    /// a payload exceeding u32::MAX bytes in practice, but the checked conversion
525    /// prevents a silent truncation cast if it ever could.
526    #[test]
527    fn payload_too_large_variant_exists_and_displays() {
528        let err = EncodeError::PayloadTooLarge(usize::MAX);
529        let msg = err.to_string();
530        assert!(msg.contains("too large"), "Display should mention 'too large': {msg}");
531        assert!(msg.contains(&usize::MAX.to_string()), "Display should include the size: {msg}");
532    }
533
534    #[test]
535    fn max_payload_size_equals_u32_max() {
536        assert_eq!(MAX_PAYLOAD_SIZE, u32::MAX as usize);
537    }
538
539    fn create_test_event() -> crate::wal::event::WalEventType {
540        crate::wal::event::WalEventType::RunStateChanged {
541            run_id: RunId::new(),
542            previous_state: RunState::Scheduled,
543            new_state: RunState::Running,
544            timestamp: 1_000_000,
545        }
546    }
547}