Skip to main content

actionqueue_storage/snapshot/
loader.rs

1//! Snapshot loader interface and file system implementation.
2//!
3//! This module provides snapshot loading using a file system backend with
4//! versioned decoding for compatibility validation.
5//!
6//! The loader reads snapshots that were written using the same framing format
7//! as the WAL: version (4 bytes) + length (4 bytes) + CRC-32 (4 bytes) + JSON payload.
8
9use std::fs::File;
10use std::io::Read;
11
12use crate::snapshot::mapping::{validate_snapshot, SnapshotMappingError};
13use crate::snapshot::model::Snapshot;
14
15/// A snapshot loader that reads state from storage.
16pub trait SnapshotLoader {
17    /// Load a snapshot from storage.
18    ///
19    /// Returns `Ok(Some(Snapshot))` if a valid snapshot was loaded,
20    /// `Ok(None)` if no snapshot exists at the configured path,
21    /// or `Err` if an error occurred during loading.
22    fn load(&mut self) -> Result<Option<Snapshot>, SnapshotLoaderError>;
23}
24
25/// Errors that can occur during snapshot loading.
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub enum SnapshotLoaderError {
28    /// I/O error during load.
29    IoError(String),
30    /// The snapshot could not be decoded.
31    DecodeError(String),
32    /// The decoded snapshot violated mapping/parity invariants.
33    MappingError(SnapshotMappingError),
34    /// The snapshot version is incompatible.
35    IncompatibleVersion {
36        /// Snapshot version expected by the loader configuration.
37        expected: u32,
38        /// Snapshot version found in the persisted snapshot frame.
39        found: u32,
40    },
41    /// The snapshot CRC-32 checksum does not match the payload.
42    CrcMismatch {
43        /// CRC-32 checksum stored in the snapshot frame.
44        expected: u32,
45        /// CRC-32 checksum computed from the payload bytes.
46        actual: u32,
47    },
48    /// The snapshot file does not exist.
49    NotFound,
50}
51
52impl std::fmt::Display for SnapshotLoaderError {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        match self {
55            SnapshotLoaderError::IoError(e) => write!(f, "I/O error: {e}"),
56            SnapshotLoaderError::DecodeError(e) => write!(f, "Decode error: {e}"),
57            SnapshotLoaderError::MappingError(e) => write!(f, "Mapping error: {e}"),
58            SnapshotLoaderError::IncompatibleVersion { expected, found } => {
59                write!(f, "Incompatible snapshot version: expected {expected}, found {found}")
60            }
61            SnapshotLoaderError::CrcMismatch { expected, actual } => {
62                write!(
63                    f,
64                    "Snapshot CRC-32 mismatch: expected {expected:#010x}, actual {actual:#010x}"
65                )
66            }
67            SnapshotLoaderError::NotFound => write!(f, "Snapshot file not found"),
68        }
69    }
70}
71
72impl std::error::Error for SnapshotLoaderError {}
73
74impl std::convert::From<std::io::Error> for SnapshotLoaderError {
75    fn from(err: std::io::Error) -> Self {
76        match err.kind() {
77            std::io::ErrorKind::NotFound => SnapshotLoaderError::NotFound,
78            _ => SnapshotLoaderError::IoError(err.to_string()),
79        }
80    }
81}
82
83/// A file system backed snapshot loader.
84///
85/// The loader opens a snapshot file and decodes it using the framed binary
86/// format written by [`SnapshotFsWriter`](super::writer::SnapshotFsWriter):
87/// - 4 bytes version (currently 4)
88/// - 4 bytes length of payload
89/// - 4 bytes CRC-32 of payload
90/// - JSON-serialized Snapshot
91///
92/// Version checking is performed to ensure compatibility between the writer
93/// and loader. If the snapshot version does not match the expected version,
94/// `SnapshotLoaderError::IncompatibleVersion` is returned.
95///
96/// If the snapshot file does not exist, `load()` will return `Ok(None)`
97/// instead of panicking.
98pub struct SnapshotFsLoader {
99    path: std::path::PathBuf,
100    version: u32,
101}
102
103impl SnapshotFsLoader {
104    /// Creates a new snapshot loader from the given path.
105    ///
106    /// # Arguments
107    ///
108    /// * `path` - The filesystem path to the snapshot file
109    pub fn new(path: std::path::PathBuf) -> Self {
110        SnapshotFsLoader { path, version: 4 }
111    }
112
113    /// Creates a new snapshot loader from the given path with a custom version.
114    ///
115    /// This is useful for testing or when the loader needs to support
116    /// multiple snapshot versions.
117    ///
118    /// # Arguments
119    ///
120    /// * `path` - The filesystem path to the snapshot file
121    /// * `version` - The expected snapshot version
122    pub fn with_version(path: std::path::PathBuf, version: u32) -> Self {
123        SnapshotFsLoader { path, version }
124    }
125
126    /// Opens the snapshot file for reading.
127    /// Returns Ok(None) if file doesn't exist, Ok(Some(file)) otherwise.
128    fn open_file(&self) -> Result<Option<File>, SnapshotLoaderError> {
129        match File::open(&self.path) {
130            Ok(file) => Ok(Some(file)),
131            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
132            Err(e) => Err(SnapshotLoaderError::IoError(e.to_string())),
133        }
134    }
135
136    /// Reads exactly `buf.len()` bytes, distinguishing clean EOF from partial reads.
137    ///
138    /// Returns `Ok(true)` if all bytes were read, `Ok(false)` if zero bytes
139    /// were available (clean EOF), or an error for partial reads (truncation)
140    /// and I/O failures.
141    fn read_exact_or_eof(
142        file: &mut File,
143        buf: &mut [u8],
144        frame_name: &str,
145    ) -> Result<bool, SnapshotLoaderError> {
146        let mut total_read = 0;
147        while total_read < buf.len() {
148            match file.read(&mut buf[total_read..]) {
149                Ok(0) => {
150                    return if total_read == 0 {
151                        Ok(false)
152                    } else {
153                        Err(SnapshotLoaderError::DecodeError(format!(
154                            "truncated snapshot {frame_name}: read {total_read} of {} bytes",
155                            buf.len()
156                        )))
157                    };
158                }
159                Ok(n) => total_read += n,
160                Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
161                Err(e) => return Err(SnapshotLoaderError::IoError(e.to_string())),
162            }
163        }
164        Ok(true)
165    }
166
167    /// Reads the version frame from the file.
168    ///
169    /// Returns `Ok(Some(version))` if read successfully, `Ok(None)` on clean
170    /// EOF (empty file), or an error on truncation/I/O failure.
171    fn read_version(file: &mut File) -> Result<Option<u32>, SnapshotLoaderError> {
172        let mut buf = [0u8; 4];
173        if Self::read_exact_or_eof(file, &mut buf, "version frame")? {
174            Ok(Some(u32::from_le_bytes(buf)))
175        } else {
176            Ok(None)
177        }
178    }
179
180    /// Reads the length frame from the file.
181    ///
182    /// EOF at this point is a truncation error (version was already read).
183    fn read_length(file: &mut File) -> Result<usize, SnapshotLoaderError> {
184        let mut buf = [0u8; 4];
185        if Self::read_exact_or_eof(file, &mut buf, "length frame")? {
186            Ok(u32::from_le_bytes(buf) as usize)
187        } else {
188            Err(SnapshotLoaderError::DecodeError(
189                "truncated snapshot: EOF after version frame, expected length frame".to_string(),
190            ))
191        }
192    }
193
194    /// Reads the CRC-32 frame from the file.
195    ///
196    /// EOF at this point is a truncation error (version and length were already read).
197    fn read_crc(file: &mut File) -> Result<u32, SnapshotLoaderError> {
198        let mut buf = [0u8; 4];
199        if Self::read_exact_or_eof(file, &mut buf, "CRC-32 frame")? {
200            Ok(u32::from_le_bytes(buf))
201        } else {
202            Err(SnapshotLoaderError::DecodeError(
203                "truncated snapshot: EOF after length frame, expected CRC-32 frame".to_string(),
204            ))
205        }
206    }
207
208    /// Reads the payload from the file.
209    ///
210    /// # Arguments
211    ///
212    /// * `file` - The file to read from
213    /// * `length` - The expected length of the payload
214    ///
215    /// # Returns
216    ///
217    /// Returns the payload bytes, or an error if the payload cannot be read.
218    fn read_payload(file: &mut File, length: usize) -> Result<Vec<u8>, SnapshotLoaderError> {
219        // Guard against unreasonable allocation from corrupted length field
220        const MAX_REASONABLE_PAYLOAD: usize = 256 * 1024 * 1024; // 256 MiB
221        if length > MAX_REASONABLE_PAYLOAD {
222            return Err(SnapshotLoaderError::DecodeError(format!(
223                "snapshot payload length {length} exceeds maximum {MAX_REASONABLE_PAYLOAD}"
224            )));
225        }
226        let mut payload = vec![0u8; length];
227        file.read_exact(&mut payload).map_err(|e| SnapshotLoaderError::IoError(e.to_string()))?;
228        Ok(payload)
229    }
230
231    /// Decodes a snapshot from the payload bytes.
232    ///
233    /// # Arguments
234    ///
235    /// * `payload` - The JSON-encoded snapshot payload
236    ///
237    /// # Returns
238    ///
239    /// Returns the decoded snapshot, or an error if decoding fails.
240    fn decode_snapshot(payload: &[u8]) -> Result<Snapshot, SnapshotLoaderError> {
241        serde_json::from_slice(payload).map_err(|e| {
242            SnapshotLoaderError::DecodeError(format!("Failed to decode snapshot: {e}"))
243        })
244    }
245}
246
247impl SnapshotLoader for SnapshotFsLoader {
248    /// Load a snapshot from storage.
249    ///
250    /// The loader reads the version frame, validates it against the expected
251    /// version, then reads the length and payload frames. The payload is
252    /// decoded from JSON into a `Snapshot` struct.
253    ///
254    /// If the snapshot file does not exist, returns `Ok(None)`.
255    ///
256    /// # Errors
257    ///
258    /// Returns `SnapshotLoaderError::IoError` if an I/O error occurs.
259    /// Returns `SnapshotLoaderError::IncompatibleVersion` if the snapshot
260    /// version does not match the expected version.
261    /// Returns `SnapshotLoaderError::DecodeError` if the snapshot cannot
262    /// be decoded.
263    fn load(&mut self) -> Result<Option<Snapshot>, SnapshotLoaderError> {
264        let mut file = match self.open_file()? {
265            Some(f) => f,
266            None => return Ok(None),
267        };
268
269        // Read version frame (clean EOF = empty file = no snapshot)
270        let version = match Self::read_version(&mut file)? {
271            Some(v) => v,
272            None => return Ok(None),
273        };
274
275        // Validate version compatibility
276        if version != self.version {
277            tracing::warn!(
278                expected = self.version,
279                found = version,
280                "snapshot version incompatible"
281            );
282            return Err(SnapshotLoaderError::IncompatibleVersion {
283                expected: self.version,
284                found: version,
285            });
286        }
287
288        // Read length frame
289        let length = Self::read_length(&mut file)?;
290
291        // Read CRC-32 frame
292        let expected_crc = Self::read_crc(&mut file)?;
293
294        // Read payload
295        let payload = Self::read_payload(&mut file, length)?;
296
297        // Validate CRC-32
298        let actual_crc = crc32fast::hash(&payload);
299        if expected_crc != actual_crc {
300            tracing::warn!(
301                expected = format_args!("{expected_crc:#010x}"),
302                actual = format_args!("{actual_crc:#010x}"),
303                "snapshot CRC-32 mismatch"
304            );
305            return Err(SnapshotLoaderError::CrcMismatch {
306                expected: expected_crc,
307                actual: actual_crc,
308            });
309        }
310
311        // Decode snapshot
312        let snapshot = match Self::decode_snapshot(&payload) {
313            Ok(s) => s,
314            Err(e) => {
315                tracing::warn!(error = %e, "snapshot decode failed");
316                return Err(e);
317            }
318        };
319        if let Err(e) = validate_snapshot(&snapshot) {
320            tracing::warn!(error = %e, "snapshot validation failed");
321            return Err(SnapshotLoaderError::MappingError(e));
322        }
323
324        Ok(Some(snapshot))
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use std::fs;
331    use std::io::Write;
332
333    use super::*;
334    use crate::snapshot::mapping::{SnapshotMappingError, SNAPSHOT_SCHEMA_VERSION};
335    use crate::snapshot::model::{SnapshotEngineControl, SnapshotMetadata};
336    use crate::snapshot::writer::{SnapshotFsWriter, SnapshotWriter};
337
338    // Atomic counter to ensure unique test file paths
339    static TEST_COUNTER: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
340
341    fn temp_snapshot_path() -> std::path::PathBuf {
342        let dir = std::env::temp_dir();
343        let count = TEST_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
344        let path = dir.join(format!("actionqueue_snapshot_loader_test_{count}.tmp"));
345        // Clean up if exists from previous test runs
346        let _ = fs::remove_file(&path);
347        path
348    }
349
350    fn open_snapshot_writer(path: std::path::PathBuf) -> SnapshotFsWriter {
351        SnapshotFsWriter::new(path).expect("Failed to open snapshot writer for loader test")
352    }
353
354    fn create_test_snapshot() -> Snapshot {
355        Snapshot {
356            version: 4,
357            timestamp: 1234567890,
358            metadata: SnapshotMetadata {
359                schema_version: SNAPSHOT_SCHEMA_VERSION,
360                wal_sequence: 0,
361                task_count: 0,
362                run_count: 0,
363            },
364            tasks: Vec::new(),
365            runs: Vec::new(),
366            engine: SnapshotEngineControl::default(),
367            dependency_declarations: Vec::new(),
368            budgets: Vec::new(),
369            subscriptions: Vec::new(),
370            actors: Vec::new(),
371            tenants: Vec::new(),
372            role_assignments: Vec::new(),
373            capability_grants: Vec::new(),
374            ledger_entries: Vec::new(),
375        }
376    }
377
378    #[test]
379    fn test_new_loader_on_nonexistent_file() {
380        let path = temp_snapshot_path();
381        // Ensure file doesn't exist
382        let _ = fs::remove_file(&path);
383
384        // Creating a loader should not panic for missing file
385        let mut loader = SnapshotFsLoader::new(path.clone());
386
387        // load() should return Ok(None) for missing file
388        let result = loader.load();
389        assert!(matches!(result, Ok(None)));
390    }
391
392    #[test]
393    fn test_load_missing_file_via_with_version() {
394        let path = temp_snapshot_path();
395        // Ensure file doesn't exist
396        let _ = fs::remove_file(&path);
397
398        // Creating a loader with custom version should not panic for missing file
399        let mut loader = SnapshotFsLoader::with_version(path.clone(), 3);
400
401        // load() should return Ok(None) for missing file
402        let result = loader.load();
403        assert!(matches!(result, Ok(None)));
404    }
405
406    #[test]
407    fn test_load_returns_snapshot() {
408        let path = temp_snapshot_path();
409
410        // Write a snapshot first
411        let snapshot = create_test_snapshot();
412        let mut writer = open_snapshot_writer(path.clone());
413        writer.write(&snapshot).expect("Write should succeed");
414        writer.close().expect("Close should succeed");
415
416        // Load the snapshot
417        let mut loader = SnapshotFsLoader::new(path.clone());
418        let loaded = loader.load().expect("Load should succeed");
419
420        assert!(loaded.is_some());
421        assert_eq!(loaded.unwrap().version, 4);
422
423        let _ = fs::remove_file(path);
424    }
425
426    #[test]
427    fn test_load_incompatible_version() {
428        let path = temp_snapshot_path();
429
430        // Create a snapshot file with an incompatible version
431        {
432            let mut file = File::create(&path).expect("Failed to create test file");
433            // Write version 1 (incompatible with expected version 4)
434            file.write_all(&1u32.to_le_bytes()).unwrap();
435            // Write payload length (for empty JSON object)
436            let payload = b"{}";
437            file.write_all(&(payload.len() as u32).to_le_bytes()).unwrap();
438            // Write CRC-32
439            file.write_all(&crc32fast::hash(payload).to_le_bytes()).unwrap();
440            file.write_all(payload).unwrap();
441            file.flush().unwrap();
442        }
443
444        // Create loader with expected version 4
445        let mut loader = SnapshotFsLoader::new(path.clone());
446        let result = loader.load();
447
448        assert!(matches!(result, Err(SnapshotLoaderError::IncompatibleVersion { .. })));
449
450        let _ = fs::remove_file(path);
451    }
452
453    #[test]
454    fn test_load_with_custom_version() {
455        let path = temp_snapshot_path();
456
457        // Write a snapshot with version 4
458        {
459            let mut writer = open_snapshot_writer(path.clone());
460            let mut snapshot = create_test_snapshot();
461            snapshot.version = 4;
462            writer.write(&snapshot).expect("Write should succeed");
463            writer.close().expect("Close should succeed");
464        }
465
466        // Load with custom version 4
467        let mut loader = SnapshotFsLoader::with_version(path.clone(), 4);
468        let loaded = loader.load().expect("Load should succeed");
469
470        assert!(loaded.is_some());
471        assert_eq!(loaded.unwrap().version, 4);
472
473        let _ = fs::remove_file(path);
474    }
475
476    #[test]
477    fn test_load_invalid_json() {
478        let path = temp_snapshot_path();
479
480        // Create a file with valid framing but invalid JSON
481        {
482            let mut file = File::create(&path).expect("Failed to create test file");
483            // Write version 4
484            file.write_all(&4u32.to_le_bytes()).unwrap();
485            // Write payload length for invalid JSON
486            let payload = b"{invalid json}";
487            file.write_all(&(payload.len() as u32).to_le_bytes()).unwrap();
488            // Write CRC-32
489            file.write_all(&crc32fast::hash(payload).to_le_bytes()).unwrap();
490            file.write_all(payload).unwrap();
491            file.flush().unwrap();
492        }
493
494        let mut loader = SnapshotFsLoader::new(path.clone());
495        let result = loader.load();
496
497        assert!(matches!(result, Err(SnapshotLoaderError::DecodeError(_))));
498
499        let _ = fs::remove_file(path);
500    }
501
502    #[test]
503    fn test_load_rejects_snapshot_mapping_violation() {
504        let path = temp_snapshot_path();
505
506        // Build a structurally valid snapshot JSON with inconsistent metadata.
507        let snapshot = Snapshot {
508            version: 4,
509            timestamp: 1234567890,
510            metadata: SnapshotMetadata {
511                schema_version: SNAPSHOT_SCHEMA_VERSION,
512                wal_sequence: 0,
513                task_count: 1,
514                run_count: 0,
515            },
516            tasks: Vec::new(),
517            runs: Vec::new(),
518            engine: SnapshotEngineControl::default(),
519            dependency_declarations: Vec::new(),
520            budgets: Vec::new(),
521            subscriptions: Vec::new(),
522            actors: Vec::new(),
523            tenants: Vec::new(),
524            role_assignments: Vec::new(),
525            capability_grants: Vec::new(),
526            ledger_entries: Vec::new(),
527        };
528        let payload = serde_json::to_vec(&snapshot).expect("snapshot should serialize");
529
530        {
531            let mut file = File::create(&path).expect("Failed to create test file");
532            file.write_all(&4u32.to_le_bytes()).expect("version frame write should succeed");
533            file.write_all(&(payload.len() as u32).to_le_bytes())
534                .expect("length frame write should succeed");
535            let crc = crc32fast::hash(&payload);
536            file.write_all(&crc.to_le_bytes()).expect("crc frame write should succeed");
537            file.write_all(&payload).expect("payload frame write should succeed");
538            file.flush().expect("flush should succeed");
539        }
540
541        let mut loader = SnapshotFsLoader::new(path.clone());
542        let result = loader.load();
543
544        assert!(matches!(
545            result,
546            Err(SnapshotLoaderError::MappingError(SnapshotMappingError::TaskCountMismatch {
547                declared: 1,
548                actual: 0
549            }))
550        ));
551
552        let _ = fs::remove_file(path);
553    }
554
555    #[test]
556    fn test_error_display() {
557        assert_eq!(SnapshotLoaderError::NotFound.to_string(), "Snapshot file not found");
558        assert_eq!(
559            SnapshotLoaderError::IoError("test error".to_string()).to_string(),
560            "I/O error: test error"
561        );
562        assert_eq!(
563            SnapshotLoaderError::DecodeError("test error".to_string()).to_string(),
564            "Decode error: test error"
565        );
566        assert_eq!(
567            SnapshotLoaderError::MappingError(SnapshotMappingError::TaskCountMismatch {
568                declared: 1,
569                actual: 0
570            })
571            .to_string(),
572            "Mapping error: snapshot task_count mismatch: declared 1, actual 0"
573        );
574        assert_eq!(
575            SnapshotLoaderError::IncompatibleVersion { expected: 4, found: 1 }.to_string(),
576            "Incompatible snapshot version: expected 4, found 1"
577        );
578        assert_eq!(
579            SnapshotLoaderError::CrcMismatch { expected: 0xDEADBEEF, actual: 0x12345678 }
580                .to_string(),
581            "Snapshot CRC-32 mismatch: expected 0xdeadbeef, actual 0x12345678"
582        );
583    }
584
585    #[test]
586    fn test_load_rejects_old_version_3_format() {
587        let path = temp_snapshot_path();
588
589        // Write a file using the old version 3 format (no CRC-32 field)
590        {
591            let mut file = File::create(&path).expect("Failed to create test file");
592            // Write version 3 (old format, incompatible with expected version 4)
593            file.write_all(&3u32.to_le_bytes()).unwrap();
594            let payload = b"{}";
595            file.write_all(&(payload.len() as u32).to_le_bytes()).unwrap();
596            file.write_all(payload).unwrap();
597            file.flush().unwrap();
598        }
599
600        let mut loader = SnapshotFsLoader::new(path.clone());
601        let result = loader.load();
602
603        assert!(matches!(
604            result,
605            Err(SnapshotLoaderError::IncompatibleVersion { expected: 4, found: 3 })
606        ));
607
608        let _ = fs::remove_file(path);
609    }
610
611    #[test]
612    fn test_load_detects_corrupted_payload() {
613        let path = temp_snapshot_path();
614
615        // Write a valid snapshot using the writer
616        let snapshot = create_test_snapshot();
617        let mut writer = open_snapshot_writer(path.clone());
618        writer.write(&snapshot).expect("Write should succeed");
619        writer.close().expect("Close should succeed");
620
621        // Corrupt one byte in the payload (past the 12-byte header: version + length + crc)
622        {
623            let mut bytes = fs::read(&path).expect("snapshot file should be readable");
624            assert!(bytes.len() > 12, "snapshot file should have header + payload");
625            // Flip a bit in the first payload byte
626            bytes[12] ^= 0xFF;
627            fs::write(&path, &bytes).expect("corrupted snapshot should be writable");
628        }
629
630        let mut loader = SnapshotFsLoader::new(path.clone());
631        let result = loader.load();
632
633        assert!(
634            matches!(result, Err(SnapshotLoaderError::CrcMismatch { .. })),
635            "corrupted payload should produce CrcMismatch, got: {result:?}"
636        );
637
638        let _ = fs::remove_file(path);
639    }
640}