1use crate::wal::event::WalEvent;
28
29pub const VERSION: u32 = 5;
31
32pub const HEADER_LEN: usize = 12;
34
35pub const MAX_PAYLOAD_SIZE: usize = u32::MAX as usize;
37
38#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum EncodeError {
41 Serialization(String),
43 PayloadTooLarge(usize),
45}
46
47impl std::fmt::Display for EncodeError {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 match self {
50 EncodeError::Serialization(msg) => write!(f, "WAL encode serialization error: {msg}"),
51 EncodeError::PayloadTooLarge(size) => {
52 write!(f, "WAL encode payload too large: {size} bytes (max {MAX_PAYLOAD_SIZE})")
53 }
54 }
55 }
56}
57
58impl std::error::Error for EncodeError {}
59
60pub fn encode(event: &WalEvent) -> Result<Vec<u8>, EncodeError> {
77 let payload =
79 postcard::to_allocvec(event).map_err(|e| EncodeError::Serialization(e.to_string()))?;
80
81 let crc = crc32fast::hash(&payload);
83
84 let mut bytes = Vec::with_capacity(HEADER_LEN + payload.len());
85
86 bytes.extend_from_slice(&VERSION.to_le_bytes());
88
89 let payload_len =
91 u32::try_from(payload.len()).map_err(|_| EncodeError::PayloadTooLarge(payload.len()))?;
92 bytes.extend_from_slice(&payload_len.to_le_bytes());
93
94 bytes.extend_from_slice(&crc.to_le_bytes());
96
97 bytes.extend_from_slice(&payload);
99
100 Ok(bytes)
101}
102
103pub fn decode(bytes: &[u8]) -> Result<WalEvent, DecodeError> {
117 if bytes.len() < HEADER_LEN + 1 {
119 return Err(DecodeError::InvalidLength(format!(
120 "Buffer too short: {} bytes (minimum {} required)",
121 bytes.len(),
122 HEADER_LEN + 1
123 )));
124 }
125
126 let version =
128 u32::from_le_bytes(bytes[0..4].try_into().map_err(|_| {
129 DecodeError::InvalidLength("version frame must be 4 bytes".to_string())
130 })?);
131
132 if version != VERSION {
133 return Err(DecodeError::UnsupportedVersion(format!(
134 "Unsupported WAL version: {version}. Current version: {VERSION}"
135 )));
136 }
137
138 let length = u32::from_le_bytes(
140 bytes[4..8]
141 .try_into()
142 .map_err(|_| DecodeError::InvalidLength("length frame must be 4 bytes".to_string()))?,
143 ) as usize;
144
145 if length > bytes.len() - HEADER_LEN {
147 return Err(DecodeError::InvalidLength(format!(
148 "Length frame indicates {} bytes but only {} available",
149 length,
150 bytes.len() - HEADER_LEN
151 )));
152 }
153
154 let expected_crc = u32::from_le_bytes(
156 bytes[8..12]
157 .try_into()
158 .map_err(|_| DecodeError::InvalidLength("CRC frame must be 4 bytes".to_string()))?,
159 );
160
161 let payload = &bytes[HEADER_LEN..HEADER_LEN + length];
163
164 let actual_crc = crc32fast::hash(payload);
166 if actual_crc != expected_crc {
167 return Err(DecodeError::CrcMismatch { expected: expected_crc, actual: actual_crc });
168 }
169
170 postcard::from_bytes(payload)
172 .map_err(|e| DecodeError::Decode(format!("Failed to deserialize WalEvent: {e}")))
173}
174
175#[derive(Debug, Clone, PartialEq, Eq)]
177pub enum DecodeError {
178 UnsupportedVersion(String),
180 InvalidLength(String),
182 CrcMismatch {
184 expected: u32,
186 actual: u32,
188 },
189 Decode(String),
191}
192
193impl std::fmt::Display for DecodeError {
194 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195 match self {
196 DecodeError::UnsupportedVersion(msg) => write!(f, "unsupported version: {msg}"),
197 DecodeError::InvalidLength(msg) => write!(f, "invalid length: {msg}"),
198 DecodeError::CrcMismatch { expected, actual } => {
199 write!(f, "CRC-32 mismatch: expected {expected:#010x}, actual {actual:#010x}")
200 }
201 DecodeError::Decode(msg) => write!(f, "decode error: {msg}"),
202 }
203 }
204}
205
206impl std::error::Error for DecodeError {}
207
208#[cfg(test)]
209mod tests {
210 use actionqueue_core::budget::BudgetDimension;
211 use actionqueue_core::ids::{AttemptId, RunId, TaskId};
212 use actionqueue_core::mutation::AttemptResultKind;
213 use actionqueue_core::run::state::RunState;
214 use actionqueue_core::subscription::{EventFilter, SubscriptionId};
215 use actionqueue_core::task::constraints::TaskConstraints;
216 use actionqueue_core::task::metadata::TaskMetadata;
217 use actionqueue_core::task::run_policy::RunPolicy;
218 use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
219
220 use super::*;
221 use crate::wal::event::WalEventType;
222
223 #[test]
224 fn encode_produces_versioned_output() {
225 let event = WalEvent::new(42, create_test_event());
226
227 let encoded = encode(&event).expect("encode should succeed");
228
229 assert_eq!(&encoded[0..4], &VERSION.to_le_bytes());
231 }
232
233 #[test]
234 fn encode_length_frame_matches_payload() {
235 let event = WalEvent::new(100, create_test_event());
236
237 let encoded = encode(&event).expect("encode should succeed");
238
239 let length = u32::from_le_bytes(encoded[4..8].try_into().unwrap()) as usize;
241 let expected_length = encoded.len() - HEADER_LEN;
242
243 assert_eq!(length, expected_length);
244 }
245
246 #[test]
247 fn encode_includes_crc32() {
248 let event = WalEvent::new(42, create_test_event());
249
250 let encoded = encode(&event).expect("encode should succeed");
251
252 let header_crc = u32::from_le_bytes(encoded[8..12].try_into().unwrap());
254
255 let payload = &encoded[HEADER_LEN..];
257 let computed_crc = crc32fast::hash(payload);
258
259 assert_eq!(header_crc, computed_crc);
260 }
261
262 #[test]
263 fn decode_produces_equivalent_event() {
264 let original = WalEvent::new(123, create_test_event());
265
266 let encoded = encode(&original).expect("encode should succeed");
267 let decoded = decode(&encoded).expect("Failed to decode");
268
269 assert_eq!(original, decoded);
270 }
271
272 #[test]
273 fn encode_is_deterministic() {
274 let event = WalEvent::new(999, create_test_event());
275
276 let encoded1 = encode(&event).expect("encode should succeed");
277 let encoded2 = encode(&event).expect("encode should succeed");
278
279 assert_eq!(encoded1, encoded2);
280 }
281
282 #[test]
283 fn decode_rejects_invalid_version() {
284 let mut bytes = Vec::new();
285
286 bytes.extend_from_slice(&99u32.to_le_bytes());
288
289 let payload = b"test";
291 bytes.extend_from_slice(&(payload.len() as u32).to_le_bytes());
292
293 bytes.extend_from_slice(&crc32fast::hash(payload).to_le_bytes());
295
296 bytes.extend_from_slice(payload);
298
299 let result = decode(&bytes);
300
301 assert!(matches!(result, Err(DecodeError::UnsupportedVersion(_))));
302 }
303
304 #[test]
305 fn decode_rejects_buffer_too_short() {
306 let bytes = vec![0; 5]; let result = decode(&bytes);
309
310 assert!(matches!(result, Err(DecodeError::InvalidLength(_))));
311 }
312
313 #[test]
314 fn decode_rejects_length_exceeds_buffer() {
315 let mut bytes = Vec::new();
316
317 bytes.extend_from_slice(&VERSION.to_le_bytes());
319
320 bytes.extend_from_slice(&1000u32.to_le_bytes());
322
323 bytes.extend_from_slice(&0u32.to_le_bytes());
325
326 bytes.extend_from_slice(b"x");
328
329 let result = decode(&bytes);
330
331 assert!(matches!(result, Err(DecodeError::InvalidLength(_))));
332 }
333
334 #[test]
335 fn decode_rejects_crc_mismatch() {
336 let event = WalEvent::new(1, create_test_event());
337 let mut encoded = encode(&event).expect("encode should succeed");
338
339 if encoded.len() > HEADER_LEN {
341 encoded[HEADER_LEN] ^= 0xFF;
342 }
343
344 let result = decode(&encoded);
345 assert!(matches!(result, Err(DecodeError::CrcMismatch { .. })));
346 }
347
348 #[test]
349 fn decode_rejects_invalid_payload() {
350 let mut bytes = Vec::new();
351
352 let payload = b"NOT VALID POSTCARD";
353
354 bytes.extend_from_slice(&VERSION.to_le_bytes());
356
357 bytes.extend_from_slice(&(payload.len() as u32).to_le_bytes());
359
360 bytes.extend_from_slice(&crc32fast::hash(payload).to_le_bytes());
362
363 bytes.extend_from_slice(payload);
365
366 let result = decode(&bytes);
367
368 assert!(matches!(result, Err(DecodeError::Decode(_))));
369 }
370
371 #[test]
372 fn crc32_detects_single_bit_flip() {
373 let event = WalEvent::new(42, create_test_event());
374 let encoded = encode(&event).expect("encode should succeed");
375
376 for i in HEADER_LEN..encoded.len() {
378 let mut corrupted = encoded.clone();
379 corrupted[i] ^= 0x01;
380 assert!(decode(&corrupted).is_err(), "single bit flip at byte {i} should be detected");
381 }
382 }
383
384 #[test]
385 fn roundtrip_all_event_types() {
386 let task_id = TaskId::new();
387 let run_id = RunId::new();
388 let attempt_id = AttemptId::new();
389 let task_spec = TaskSpec::new(
390 task_id,
391 TaskPayload::with_content_type(vec![1, 2, 3], "application/octet-stream"),
392 RunPolicy::Once,
393 TaskConstraints::default(),
394 TaskMetadata::default(),
395 )
396 .expect("valid test task");
397
398 let run_instance = actionqueue_core::run::run_instance::RunInstance::new_scheduled_with_id(
399 run_id, task_id, 100, 100,
400 )
401 .expect("valid scheduled run");
402
403 let events: Vec<WalEventType> = vec![
404 WalEventType::TaskCreated { task_spec, timestamp: 1000 },
405 WalEventType::RunCreated { run_instance },
406 WalEventType::RunStateChanged {
407 run_id,
408 previous_state: RunState::Scheduled,
409 new_state: RunState::Ready,
410 timestamp: 2000,
411 },
412 WalEventType::AttemptStarted { run_id, attempt_id, timestamp: 3000 },
413 WalEventType::AttemptFinished {
414 run_id,
415 attempt_id,
416 result: AttemptResultKind::Success,
417 error: None,
418 output: None,
419 timestamp: 4000,
420 },
421 WalEventType::AttemptFinished {
422 run_id,
423 attempt_id,
424 result: AttemptResultKind::Failure,
425 error: Some("test error".to_string()),
426 output: None,
427 timestamp: 4001,
428 },
429 WalEventType::TaskCanceled { task_id, timestamp: 5000 },
430 WalEventType::RunCanceled { run_id, timestamp: 5001 },
431 WalEventType::LeaseAcquired {
432 run_id,
433 owner: "worker-1".to_string(),
434 expiry: 9000,
435 timestamp: 6000,
436 },
437 WalEventType::LeaseHeartbeat {
438 run_id,
439 owner: "worker-1".to_string(),
440 expiry: 10000,
441 timestamp: 7000,
442 },
443 WalEventType::LeaseExpired {
444 run_id,
445 owner: "worker-1".to_string(),
446 expiry: 10000,
447 timestamp: 11000,
448 },
449 WalEventType::LeaseReleased {
450 run_id,
451 owner: "worker-1".to_string(),
452 expiry: 10000,
453 timestamp: 8000,
454 },
455 WalEventType::EnginePaused { timestamp: 12000 },
456 WalEventType::EngineResumed { timestamp: 13000 },
457 WalEventType::DependencyDeclared {
458 task_id,
459 depends_on: vec![TaskId::new(), TaskId::new()],
460 timestamp: 14000,
461 },
462 WalEventType::AttemptFinished {
463 run_id,
464 attempt_id,
465 result: AttemptResultKind::Success,
466 error: None,
467 output: Some(b"test-output".to_vec()),
468 timestamp: 15000,
469 },
470 WalEventType::RunSuspended {
471 run_id,
472 reason: Some("budget exhausted".to_string()),
473 timestamp: 16000,
474 },
475 WalEventType::RunResumed { run_id, timestamp: 17000 },
476 WalEventType::BudgetAllocated {
477 task_id,
478 dimension: BudgetDimension::Token,
479 limit: 1000,
480 timestamp: 18000,
481 },
482 WalEventType::BudgetConsumed {
483 task_id,
484 dimension: BudgetDimension::CostCents,
485 amount: 250,
486 timestamp: 19000,
487 },
488 WalEventType::BudgetExhausted {
489 task_id,
490 dimension: BudgetDimension::TimeSecs,
491 timestamp: 20000,
492 },
493 WalEventType::BudgetReplenished {
494 task_id,
495 dimension: BudgetDimension::Token,
496 new_limit: 2000,
497 timestamp: 21000,
498 },
499 WalEventType::SubscriptionCreated {
500 subscription_id: SubscriptionId::new(),
501 task_id,
502 filter: EventFilter::TaskCompleted { task_id },
503 timestamp: 22000,
504 },
505 WalEventType::SubscriptionTriggered {
506 subscription_id: SubscriptionId::new(),
507 timestamp: 23000,
508 },
509 WalEventType::SubscriptionCanceled {
510 subscription_id: SubscriptionId::new(),
511 timestamp: 24000,
512 },
513 ];
514
515 for (seq, event_type) in events.into_iter().enumerate() {
516 let original = WalEvent::new((seq + 1) as u64, event_type);
517 let encoded = encode(&original).expect("encode should succeed");
518 let decoded = decode(&encoded).expect("decode should succeed");
519 assert_eq!(original, decoded, "roundtrip failed for event at sequence {}", seq + 1);
520 }
521 }
522
523 #[test]
527 fn payload_too_large_variant_exists_and_displays() {
528 let err = EncodeError::PayloadTooLarge(usize::MAX);
529 let msg = err.to_string();
530 assert!(msg.contains("too large"), "Display should mention 'too large': {msg}");
531 assert!(msg.contains(&usize::MAX.to_string()), "Display should include the size: {msg}");
532 }
533
534 #[test]
535 fn max_payload_size_equals_u32_max() {
536 assert_eq!(MAX_PAYLOAD_SIZE, u32::MAX as usize);
537 }
538
539 fn create_test_event() -> crate::wal::event::WalEventType {
540 crate::wal::event::WalEventType::RunStateChanged {
541 run_id: RunId::new(),
542 previous_state: RunState::Scheduled,
543 new_state: RunState::Running,
544 timestamp: 1_000_000,
545 }
546 }
547}