actionqueue_storage/snapshot/
loader.rs1use std::fs::File;
10use std::io::Read;
11
12use crate::snapshot::mapping::{validate_snapshot, SnapshotMappingError};
13use crate::snapshot::model::Snapshot;
14
15pub trait SnapshotLoader {
17 fn load(&mut self) -> Result<Option<Snapshot>, SnapshotLoaderError>;
23}
24
25#[derive(Debug, Clone, PartialEq, Eq)]
27pub enum SnapshotLoaderError {
28 IoError(String),
30 DecodeError(String),
32 MappingError(SnapshotMappingError),
34 IncompatibleVersion {
36 expected: u32,
38 found: u32,
40 },
41 CrcMismatch {
43 expected: u32,
45 actual: u32,
47 },
48 NotFound,
50}
51
52impl std::fmt::Display for SnapshotLoaderError {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 match self {
55 SnapshotLoaderError::IoError(e) => write!(f, "I/O error: {e}"),
56 SnapshotLoaderError::DecodeError(e) => write!(f, "Decode error: {e}"),
57 SnapshotLoaderError::MappingError(e) => write!(f, "Mapping error: {e}"),
58 SnapshotLoaderError::IncompatibleVersion { expected, found } => {
59 write!(f, "Incompatible snapshot version: expected {expected}, found {found}")
60 }
61 SnapshotLoaderError::CrcMismatch { expected, actual } => {
62 write!(
63 f,
64 "Snapshot CRC-32 mismatch: expected {expected:#010x}, actual {actual:#010x}"
65 )
66 }
67 SnapshotLoaderError::NotFound => write!(f, "Snapshot file not found"),
68 }
69 }
70}
71
72impl std::error::Error for SnapshotLoaderError {}
73
74impl std::convert::From<std::io::Error> for SnapshotLoaderError {
75 fn from(err: std::io::Error) -> Self {
76 match err.kind() {
77 std::io::ErrorKind::NotFound => SnapshotLoaderError::NotFound,
78 _ => SnapshotLoaderError::IoError(err.to_string()),
79 }
80 }
81}
82
83pub struct SnapshotFsLoader {
99 path: std::path::PathBuf,
100 version: u32,
101}
102
103impl SnapshotFsLoader {
104 pub fn new(path: std::path::PathBuf) -> Self {
110 SnapshotFsLoader { path, version: 4 }
111 }
112
113 pub fn with_version(path: std::path::PathBuf, version: u32) -> Self {
123 SnapshotFsLoader { path, version }
124 }
125
126 fn open_file(&self) -> Result<Option<File>, SnapshotLoaderError> {
129 match File::open(&self.path) {
130 Ok(file) => Ok(Some(file)),
131 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
132 Err(e) => Err(SnapshotLoaderError::IoError(e.to_string())),
133 }
134 }
135
136 fn read_exact_or_eof(
142 file: &mut File,
143 buf: &mut [u8],
144 frame_name: &str,
145 ) -> Result<bool, SnapshotLoaderError> {
146 let mut total_read = 0;
147 while total_read < buf.len() {
148 match file.read(&mut buf[total_read..]) {
149 Ok(0) => {
150 return if total_read == 0 {
151 Ok(false)
152 } else {
153 Err(SnapshotLoaderError::DecodeError(format!(
154 "truncated snapshot {frame_name}: read {total_read} of {} bytes",
155 buf.len()
156 )))
157 };
158 }
159 Ok(n) => total_read += n,
160 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
161 Err(e) => return Err(SnapshotLoaderError::IoError(e.to_string())),
162 }
163 }
164 Ok(true)
165 }
166
167 fn read_version(file: &mut File) -> Result<Option<u32>, SnapshotLoaderError> {
172 let mut buf = [0u8; 4];
173 if Self::read_exact_or_eof(file, &mut buf, "version frame")? {
174 Ok(Some(u32::from_le_bytes(buf)))
175 } else {
176 Ok(None)
177 }
178 }
179
180 fn read_length(file: &mut File) -> Result<usize, SnapshotLoaderError> {
184 let mut buf = [0u8; 4];
185 if Self::read_exact_or_eof(file, &mut buf, "length frame")? {
186 Ok(u32::from_le_bytes(buf) as usize)
187 } else {
188 Err(SnapshotLoaderError::DecodeError(
189 "truncated snapshot: EOF after version frame, expected length frame".to_string(),
190 ))
191 }
192 }
193
194 fn read_crc(file: &mut File) -> Result<u32, SnapshotLoaderError> {
198 let mut buf = [0u8; 4];
199 if Self::read_exact_or_eof(file, &mut buf, "CRC-32 frame")? {
200 Ok(u32::from_le_bytes(buf))
201 } else {
202 Err(SnapshotLoaderError::DecodeError(
203 "truncated snapshot: EOF after length frame, expected CRC-32 frame".to_string(),
204 ))
205 }
206 }
207
208 fn read_payload(file: &mut File, length: usize) -> Result<Vec<u8>, SnapshotLoaderError> {
219 const MAX_REASONABLE_PAYLOAD: usize = 256 * 1024 * 1024; if length > MAX_REASONABLE_PAYLOAD {
222 return Err(SnapshotLoaderError::DecodeError(format!(
223 "snapshot payload length {length} exceeds maximum {MAX_REASONABLE_PAYLOAD}"
224 )));
225 }
226 let mut payload = vec![0u8; length];
227 file.read_exact(&mut payload).map_err(|e| SnapshotLoaderError::IoError(e.to_string()))?;
228 Ok(payload)
229 }
230
231 fn decode_snapshot(payload: &[u8]) -> Result<Snapshot, SnapshotLoaderError> {
241 serde_json::from_slice(payload).map_err(|e| {
242 SnapshotLoaderError::DecodeError(format!("Failed to decode snapshot: {e}"))
243 })
244 }
245}
246
247impl SnapshotLoader for SnapshotFsLoader {
248 fn load(&mut self) -> Result<Option<Snapshot>, SnapshotLoaderError> {
264 let mut file = match self.open_file()? {
265 Some(f) => f,
266 None => return Ok(None),
267 };
268
269 let version = match Self::read_version(&mut file)? {
271 Some(v) => v,
272 None => return Ok(None),
273 };
274
275 if version != self.version {
277 tracing::warn!(
278 expected = self.version,
279 found = version,
280 "snapshot version incompatible"
281 );
282 return Err(SnapshotLoaderError::IncompatibleVersion {
283 expected: self.version,
284 found: version,
285 });
286 }
287
288 let length = Self::read_length(&mut file)?;
290
291 let expected_crc = Self::read_crc(&mut file)?;
293
294 let payload = Self::read_payload(&mut file, length)?;
296
297 let actual_crc = crc32fast::hash(&payload);
299 if expected_crc != actual_crc {
300 tracing::warn!(
301 expected = format_args!("{expected_crc:#010x}"),
302 actual = format_args!("{actual_crc:#010x}"),
303 "snapshot CRC-32 mismatch"
304 );
305 return Err(SnapshotLoaderError::CrcMismatch {
306 expected: expected_crc,
307 actual: actual_crc,
308 });
309 }
310
311 let snapshot = match Self::decode_snapshot(&payload) {
313 Ok(s) => s,
314 Err(e) => {
315 tracing::warn!(error = %e, "snapshot decode failed");
316 return Err(e);
317 }
318 };
319 if let Err(e) = validate_snapshot(&snapshot) {
320 tracing::warn!(error = %e, "snapshot validation failed");
321 return Err(SnapshotLoaderError::MappingError(e));
322 }
323
324 Ok(Some(snapshot))
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use std::fs;
331 use std::io::Write;
332
333 use super::*;
334 use crate::snapshot::mapping::{SnapshotMappingError, SNAPSHOT_SCHEMA_VERSION};
335 use crate::snapshot::model::{SnapshotEngineControl, SnapshotMetadata};
336 use crate::snapshot::writer::{SnapshotFsWriter, SnapshotWriter};
337
338 static TEST_COUNTER: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
340
341 fn temp_snapshot_path() -> std::path::PathBuf {
342 let dir = std::env::temp_dir();
343 let count = TEST_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
344 let path = dir.join(format!("actionqueue_snapshot_loader_test_{count}.tmp"));
345 let _ = fs::remove_file(&path);
347 path
348 }
349
350 fn open_snapshot_writer(path: std::path::PathBuf) -> SnapshotFsWriter {
351 SnapshotFsWriter::new(path).expect("Failed to open snapshot writer for loader test")
352 }
353
354 fn create_test_snapshot() -> Snapshot {
355 Snapshot {
356 version: 4,
357 timestamp: 1234567890,
358 metadata: SnapshotMetadata {
359 schema_version: SNAPSHOT_SCHEMA_VERSION,
360 wal_sequence: 0,
361 task_count: 0,
362 run_count: 0,
363 },
364 tasks: Vec::new(),
365 runs: Vec::new(),
366 engine: SnapshotEngineControl::default(),
367 dependency_declarations: Vec::new(),
368 budgets: Vec::new(),
369 subscriptions: Vec::new(),
370 actors: Vec::new(),
371 tenants: Vec::new(),
372 role_assignments: Vec::new(),
373 capability_grants: Vec::new(),
374 ledger_entries: Vec::new(),
375 }
376 }
377
378 #[test]
379 fn test_new_loader_on_nonexistent_file() {
380 let path = temp_snapshot_path();
381 let _ = fs::remove_file(&path);
383
384 let mut loader = SnapshotFsLoader::new(path.clone());
386
387 let result = loader.load();
389 assert!(matches!(result, Ok(None)));
390 }
391
392 #[test]
393 fn test_load_missing_file_via_with_version() {
394 let path = temp_snapshot_path();
395 let _ = fs::remove_file(&path);
397
398 let mut loader = SnapshotFsLoader::with_version(path.clone(), 3);
400
401 let result = loader.load();
403 assert!(matches!(result, Ok(None)));
404 }
405
406 #[test]
407 fn test_load_returns_snapshot() {
408 let path = temp_snapshot_path();
409
410 let snapshot = create_test_snapshot();
412 let mut writer = open_snapshot_writer(path.clone());
413 writer.write(&snapshot).expect("Write should succeed");
414 writer.close().expect("Close should succeed");
415
416 let mut loader = SnapshotFsLoader::new(path.clone());
418 let loaded = loader.load().expect("Load should succeed");
419
420 assert!(loaded.is_some());
421 assert_eq!(loaded.unwrap().version, 4);
422
423 let _ = fs::remove_file(path);
424 }
425
426 #[test]
427 fn test_load_incompatible_version() {
428 let path = temp_snapshot_path();
429
430 {
432 let mut file = File::create(&path).expect("Failed to create test file");
433 file.write_all(&1u32.to_le_bytes()).unwrap();
435 let payload = b"{}";
437 file.write_all(&(payload.len() as u32).to_le_bytes()).unwrap();
438 file.write_all(&crc32fast::hash(payload).to_le_bytes()).unwrap();
440 file.write_all(payload).unwrap();
441 file.flush().unwrap();
442 }
443
444 let mut loader = SnapshotFsLoader::new(path.clone());
446 let result = loader.load();
447
448 assert!(matches!(result, Err(SnapshotLoaderError::IncompatibleVersion { .. })));
449
450 let _ = fs::remove_file(path);
451 }
452
453 #[test]
454 fn test_load_with_custom_version() {
455 let path = temp_snapshot_path();
456
457 {
459 let mut writer = open_snapshot_writer(path.clone());
460 let mut snapshot = create_test_snapshot();
461 snapshot.version = 4;
462 writer.write(&snapshot).expect("Write should succeed");
463 writer.close().expect("Close should succeed");
464 }
465
466 let mut loader = SnapshotFsLoader::with_version(path.clone(), 4);
468 let loaded = loader.load().expect("Load should succeed");
469
470 assert!(loaded.is_some());
471 assert_eq!(loaded.unwrap().version, 4);
472
473 let _ = fs::remove_file(path);
474 }
475
476 #[test]
477 fn test_load_invalid_json() {
478 let path = temp_snapshot_path();
479
480 {
482 let mut file = File::create(&path).expect("Failed to create test file");
483 file.write_all(&4u32.to_le_bytes()).unwrap();
485 let payload = b"{invalid json}";
487 file.write_all(&(payload.len() as u32).to_le_bytes()).unwrap();
488 file.write_all(&crc32fast::hash(payload).to_le_bytes()).unwrap();
490 file.write_all(payload).unwrap();
491 file.flush().unwrap();
492 }
493
494 let mut loader = SnapshotFsLoader::new(path.clone());
495 let result = loader.load();
496
497 assert!(matches!(result, Err(SnapshotLoaderError::DecodeError(_))));
498
499 let _ = fs::remove_file(path);
500 }
501
502 #[test]
503 fn test_load_rejects_snapshot_mapping_violation() {
504 let path = temp_snapshot_path();
505
506 let snapshot = Snapshot {
508 version: 4,
509 timestamp: 1234567890,
510 metadata: SnapshotMetadata {
511 schema_version: SNAPSHOT_SCHEMA_VERSION,
512 wal_sequence: 0,
513 task_count: 1,
514 run_count: 0,
515 },
516 tasks: Vec::new(),
517 runs: Vec::new(),
518 engine: SnapshotEngineControl::default(),
519 dependency_declarations: Vec::new(),
520 budgets: Vec::new(),
521 subscriptions: Vec::new(),
522 actors: Vec::new(),
523 tenants: Vec::new(),
524 role_assignments: Vec::new(),
525 capability_grants: Vec::new(),
526 ledger_entries: Vec::new(),
527 };
528 let payload = serde_json::to_vec(&snapshot).expect("snapshot should serialize");
529
530 {
531 let mut file = File::create(&path).expect("Failed to create test file");
532 file.write_all(&4u32.to_le_bytes()).expect("version frame write should succeed");
533 file.write_all(&(payload.len() as u32).to_le_bytes())
534 .expect("length frame write should succeed");
535 let crc = crc32fast::hash(&payload);
536 file.write_all(&crc.to_le_bytes()).expect("crc frame write should succeed");
537 file.write_all(&payload).expect("payload frame write should succeed");
538 file.flush().expect("flush should succeed");
539 }
540
541 let mut loader = SnapshotFsLoader::new(path.clone());
542 let result = loader.load();
543
544 assert!(matches!(
545 result,
546 Err(SnapshotLoaderError::MappingError(SnapshotMappingError::TaskCountMismatch {
547 declared: 1,
548 actual: 0
549 }))
550 ));
551
552 let _ = fs::remove_file(path);
553 }
554
555 #[test]
556 fn test_error_display() {
557 assert_eq!(SnapshotLoaderError::NotFound.to_string(), "Snapshot file not found");
558 assert_eq!(
559 SnapshotLoaderError::IoError("test error".to_string()).to_string(),
560 "I/O error: test error"
561 );
562 assert_eq!(
563 SnapshotLoaderError::DecodeError("test error".to_string()).to_string(),
564 "Decode error: test error"
565 );
566 assert_eq!(
567 SnapshotLoaderError::MappingError(SnapshotMappingError::TaskCountMismatch {
568 declared: 1,
569 actual: 0
570 })
571 .to_string(),
572 "Mapping error: snapshot task_count mismatch: declared 1, actual 0"
573 );
574 assert_eq!(
575 SnapshotLoaderError::IncompatibleVersion { expected: 4, found: 1 }.to_string(),
576 "Incompatible snapshot version: expected 4, found 1"
577 );
578 assert_eq!(
579 SnapshotLoaderError::CrcMismatch { expected: 0xDEADBEEF, actual: 0x12345678 }
580 .to_string(),
581 "Snapshot CRC-32 mismatch: expected 0xdeadbeef, actual 0x12345678"
582 );
583 }
584
585 #[test]
586 fn test_load_rejects_old_version_3_format() {
587 let path = temp_snapshot_path();
588
589 {
591 let mut file = File::create(&path).expect("Failed to create test file");
592 file.write_all(&3u32.to_le_bytes()).unwrap();
594 let payload = b"{}";
595 file.write_all(&(payload.len() as u32).to_le_bytes()).unwrap();
596 file.write_all(payload).unwrap();
597 file.flush().unwrap();
598 }
599
600 let mut loader = SnapshotFsLoader::new(path.clone());
601 let result = loader.load();
602
603 assert!(matches!(
604 result,
605 Err(SnapshotLoaderError::IncompatibleVersion { expected: 4, found: 3 })
606 ));
607
608 let _ = fs::remove_file(path);
609 }
610
611 #[test]
612 fn test_load_detects_corrupted_payload() {
613 let path = temp_snapshot_path();
614
615 let snapshot = create_test_snapshot();
617 let mut writer = open_snapshot_writer(path.clone());
618 writer.write(&snapshot).expect("Write should succeed");
619 writer.close().expect("Close should succeed");
620
621 {
623 let mut bytes = fs::read(&path).expect("snapshot file should be readable");
624 assert!(bytes.len() > 12, "snapshot file should have header + payload");
625 bytes[12] ^= 0xFF;
627 fs::write(&path, &bytes).expect("corrupted snapshot should be writable");
628 }
629
630 let mut loader = SnapshotFsLoader::new(path.clone());
631 let result = loader.load();
632
633 assert!(
634 matches!(result, Err(SnapshotLoaderError::CrcMismatch { .. })),
635 "corrupted payload should produce CrcMismatch, got: {result:?}"
636 );
637
638 let _ = fs::remove_file(path);
639 }
640}