Skip to main content

velesdb_core/agent/
snapshot.rs

1//! Snapshot and versioning support for `AgentMemory`.
2//!
3//! Provides serialization/deserialization of `AgentMemory` state for:
4//! - Persistence across restarts
5//! - Rollback to previous versions
6//! - State transfer between instances
7//!
8//! # Snapshot Format
9//!
10//! ```text
11//! [Magic: "VAMM" 4 bytes]
12//! [Version: 1 byte]
13//! [Semantic state length: 8 bytes]
14//! [Semantic state: N bytes]
15//! [Episodic state length: 8 bytes]
16//! [Episodic state: N bytes]
17//! [Procedural state length: 8 bytes]
18//! [Procedural state: N bytes]
19//! [TTL state length: 8 bytes]
20//! [TTL state: N bytes]
21//! [CRC32: 4 bytes]
22//! ```
23
24// Reason: Numeric casts in snapshot handling are intentional:
25// - usize to u32 in CRC32: i ranges 0-255, always fits in u32
26// - u64 to usize for lengths: Snapshot data is created/loaded on same architecture
27//   or architecture-compatible data. Lengths are validated before use.
28// All length values are bounds-checked against data.len() before array access.
29#![allow(clippy::cast_possible_truncation)]
30
31use std::fs::File;
32use std::io::{self, Read, Write};
33use std::path::Path;
34
35use crate::storage::snapshot::crc32_hash;
36
37/// Snapshot file magic bytes for `AgentMemory`.
38pub const SNAPSHOT_MAGIC: &[u8; 4] = b"VAMM";
39
40/// Current snapshot format version.
41pub const SNAPSHOT_VERSION: u8 = 1;
42
43/// Memory state for serialization.
44#[derive(Debug, Clone, Default)]
45pub struct MemoryState {
46    /// Serialized semantic memory entries.
47    pub semantic: Vec<u8>,
48    /// Serialized episodic memory entries.
49    pub episodic: Vec<u8>,
50    /// Serialized procedural memory entries.
51    pub procedural: Vec<u8>,
52    /// Serialized TTL state.
53    pub ttl: Vec<u8>,
54}
55
56/// Snapshot metadata.
57#[derive(Debug, Clone)]
58pub struct SnapshotMetadata {
59    /// Snapshot format version.
60    pub version: u8,
61    /// Total size in bytes.
62    pub total_size: usize,
63    /// CRC32 checksum.
64    pub checksum: u32,
65}
66
67/// Error type for snapshot operations.
68#[derive(Debug)]
69#[non_exhaustive]
70pub enum SnapshotError {
71    /// IO error during read/write.
72    Io(io::Error),
73    /// Invalid magic bytes.
74    InvalidMagic,
75    /// Unsupported version.
76    UnsupportedVersion(u8),
77    /// CRC checksum mismatch.
78    ChecksumMismatch {
79        /// Expected CRC32 value stored in the snapshot.
80        expected: u32,
81        /// Actual CRC32 value computed from the data.
82        actual: u32,
83    },
84    /// Data corruption or truncation.
85    CorruptedData(String),
86}
87
88impl std::fmt::Display for SnapshotError {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        match self {
91            Self::Io(e) => write!(f, "IO error: {e}"),
92            Self::InvalidMagic => write!(f, "Invalid snapshot magic bytes"),
93            Self::UnsupportedVersion(v) => write!(f, "Unsupported snapshot version: {v}"),
94            Self::ChecksumMismatch { expected, actual } => {
95                write!(
96                    f,
97                    "Checksum mismatch: expected {expected:08x}, got {actual:08x}"
98                )
99            }
100            Self::CorruptedData(msg) => write!(f, "Corrupted data: {msg}"),
101        }
102    }
103}
104
105impl std::error::Error for SnapshotError {
106    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
107        match self {
108            Self::Io(e) => Some(e),
109            _ => None,
110        }
111    }
112}
113
114impl From<io::Error> for SnapshotError {
115    fn from(e: io::Error) -> Self {
116        Self::Io(e)
117    }
118}
119
120/// Creates a snapshot from memory state.
121///
122/// # Arguments
123///
124/// * `state` - Memory state to serialize
125///
126/// # Returns
127///
128/// Serialized snapshot bytes.
129#[must_use]
130pub fn create_snapshot(state: &MemoryState) -> Vec<u8> {
131    let total_size = 4
132        + 1
133        + 8
134        + state.semantic.len()
135        + 8
136        + state.episodic.len()
137        + 8
138        + state.procedural.len()
139        + 8
140        + state.ttl.len()
141        + 4;
142    let mut buf = Vec::with_capacity(total_size);
143
144    buf.extend_from_slice(SNAPSHOT_MAGIC);
145    buf.push(SNAPSHOT_VERSION);
146
147    buf.extend_from_slice(&(state.semantic.len() as u64).to_le_bytes());
148    buf.extend_from_slice(&state.semantic);
149
150    buf.extend_from_slice(&(state.episodic.len() as u64).to_le_bytes());
151    buf.extend_from_slice(&state.episodic);
152
153    buf.extend_from_slice(&(state.procedural.len() as u64).to_le_bytes());
154    buf.extend_from_slice(&state.procedural);
155
156    buf.extend_from_slice(&(state.ttl.len() as u64).to_le_bytes());
157    buf.extend_from_slice(&state.ttl);
158
159    let crc = crc32_hash(&buf);
160    buf.extend_from_slice(&crc.to_le_bytes());
161
162    buf
163}
164
165/// Loads a snapshot from bytes.
166///
167/// # Arguments
168///
169/// * `data` - Snapshot bytes
170///
171/// # Errors
172///
173/// Returns error if snapshot is invalid or corrupted.
174pub fn load_snapshot(data: &[u8]) -> Result<MemoryState, SnapshotError> {
175    validate_snapshot_header(data)?;
176
177    let mut offset = 5; // skip magic (4) + version (1)
178    let payload_end = data.len() - 4; // exclude trailing CRC
179
180    let semantic = read_section(data, &mut offset, payload_end, "Semantic")?;
181    let episodic = read_section(data, &mut offset, payload_end, "Episodic")?;
182    let procedural = read_section(data, &mut offset, payload_end, "Procedural")?;
183    let ttl = read_section(data, &mut offset, payload_end, "TTL")?;
184
185    Ok(MemoryState {
186        semantic,
187        episodic,
188        procedural,
189        ttl,
190    })
191}
192
193/// Validates magic bytes, version, and CRC32 checksum of a snapshot.
194fn validate_snapshot_header(data: &[u8]) -> Result<(), SnapshotError> {
195    const MIN_SIZE: usize = 4 + 1 + 8 + 8 + 8 + 8 + 4;
196
197    if data.len() < MIN_SIZE {
198        return Err(SnapshotError::CorruptedData(
199            "Snapshot too small".to_string(),
200        ));
201    }
202    if &data[0..4] != SNAPSHOT_MAGIC {
203        return Err(SnapshotError::InvalidMagic);
204    }
205    let version = data[4];
206    if version != SNAPSHOT_VERSION {
207        return Err(SnapshotError::UnsupportedVersion(version));
208    }
209
210    let stored_crc = u32::from_le_bytes(
211        data[data.len() - 4..]
212            .try_into()
213            .map_err(|_| SnapshotError::CorruptedData("Invalid CRC bytes".to_string()))?,
214    );
215    let computed_crc = crc32_hash(&data[..data.len() - 4]);
216    if stored_crc != computed_crc {
217        return Err(SnapshotError::ChecksumMismatch {
218            expected: stored_crc,
219            actual: computed_crc,
220        });
221    }
222    Ok(())
223}
224
225/// Reads a length-prefixed section from the snapshot data.
226fn read_section(
227    data: &[u8],
228    offset: &mut usize,
229    payload_end: usize,
230    label: &str,
231) -> Result<Vec<u8>, SnapshotError> {
232    let section_len = read_u64(&data[*offset..])? as usize;
233    *offset += 8;
234    if *offset + section_len > payload_end {
235        return Err(SnapshotError::CorruptedData(format!(
236            "{label} data truncated"
237        )));
238    }
239    let section = data[*offset..*offset + section_len].to_vec();
240    *offset += section_len;
241    Ok(section)
242}
243
244/// Saves a snapshot to a file.
245///
246/// Uses atomic write (temp file + rename) for safety.
247///
248/// # Errors
249///
250/// Returns error if file operations fail.
251pub fn save_snapshot_to_file<P: AsRef<Path>>(
252    path: P,
253    state: &MemoryState,
254) -> Result<(), SnapshotError> {
255    let path = path.as_ref();
256    let snapshot_data = create_snapshot(state);
257
258    let temp_path = path.with_extension("tmp");
259    let mut file = File::create(&temp_path)?;
260    file.write_all(&snapshot_data)?;
261    file.sync_all()?;
262    drop(file);
263
264    std::fs::rename(&temp_path, path)?;
265
266    Ok(())
267}
268
269/// Loads a snapshot from a file.
270///
271/// # Errors
272///
273/// Returns error if file operations fail or snapshot is invalid.
274pub fn load_snapshot_from_file<P: AsRef<Path>>(path: P) -> Result<MemoryState, SnapshotError> {
275    let mut file = File::open(path)?;
276    let mut data = Vec::new();
277    file.read_to_end(&mut data)?;
278    load_snapshot(&data)
279}
280
281/// Helper to read u64 from bytes.
282fn read_u64(data: &[u8]) -> Result<u64, SnapshotError> {
283    if data.len() < 8 {
284        return Err(SnapshotError::CorruptedData(
285            "Not enough bytes for u64".to_string(),
286        ));
287    }
288    Ok(u64::from_le_bytes(data[0..8].try_into().map_err(|_| {
289        SnapshotError::CorruptedData("Invalid u64 bytes".to_string())
290    })?))
291}
292
293/// Snapshot manager for versioned snapshots.
294pub struct SnapshotManager {
295    /// Base directory for snapshots.
296    base_path: std::path::PathBuf,
297    /// Maximum number of snapshots to retain.
298    max_snapshots: usize,
299}
300
301impl SnapshotManager {
302    /// Creates a new snapshot manager.
303    ///
304    /// # Arguments
305    ///
306    /// * `base_path` - Directory for storing snapshots
307    /// * `max_snapshots` - Maximum number of snapshots to retain
308    pub fn new<P: AsRef<Path>>(base_path: P, max_snapshots: usize) -> Self {
309        Self {
310            base_path: base_path.as_ref().to_path_buf(),
311            max_snapshots,
312        }
313    }
314
315    /// Creates a new versioned snapshot.
316    ///
317    /// # Returns
318    ///
319    /// The version number of the created snapshot.
320    ///
321    /// # Errors
322    ///
323    /// Returns error if file operations fail.
324    pub fn create_versioned_snapshot(&self, state: &MemoryState) -> Result<u64, SnapshotError> {
325        std::fs::create_dir_all(&self.base_path)?;
326
327        let version = self.next_version()?;
328        let filename = format!("snapshot_{version:08}.vamm");
329        let path = self.base_path.join(filename);
330
331        save_snapshot_to_file(&path, state)?;
332        self.cleanup_old_snapshots()?;
333
334        Ok(version)
335    }
336
337    /// Loads the latest snapshot.
338    ///
339    /// # Errors
340    ///
341    /// Returns error if no snapshots exist or loading fails.
342    pub fn load_latest(&self) -> Result<(u64, MemoryState), SnapshotError> {
343        let version = self
344            .latest_version()?
345            .ok_or_else(|| SnapshotError::CorruptedData("No snapshots found".to_string()))?;
346        let state = self.load_version(version)?;
347        Ok((version, state))
348    }
349
350    /// Loads a specific snapshot version.
351    ///
352    /// # Errors
353    ///
354    /// Returns error if version doesn't exist or loading fails.
355    pub fn load_version(&self, version: u64) -> Result<MemoryState, SnapshotError> {
356        let filename = format!("snapshot_{version:08}.vamm");
357        let path = self.base_path.join(filename);
358        load_snapshot_from_file(&path)
359    }
360
361    /// Lists all available snapshot versions.
362    ///
363    /// # Errors
364    ///
365    /// Returns error if directory operations fail.
366    pub fn list_versions(&self) -> Result<Vec<u64>, SnapshotError> {
367        if !self.base_path.exists() {
368            return Ok(Vec::new());
369        }
370
371        let mut versions: Vec<u64> = std::fs::read_dir(&self.base_path)?
372            .filter_map(Result::ok)
373            .filter_map(|e| parse_snapshot_version(&e.file_name().to_string_lossy()))
374            .collect();
375
376        versions.sort_unstable();
377        Ok(versions)
378    }
379
380    /// Returns the latest snapshot version.
381    fn latest_version(&self) -> Result<Option<u64>, SnapshotError> {
382        Ok(self.list_versions()?.into_iter().max())
383    }
384
385    /// Returns the next version number.
386    fn next_version(&self) -> Result<u64, SnapshotError> {
387        Ok(self.latest_version()?.map_or(1, |v| v + 1))
388    }
389
390    /// Removes old snapshots beyond the retention limit.
391    fn cleanup_old_snapshots(&self) -> Result<(), SnapshotError> {
392        let versions = self.list_versions()?;
393        if versions.len() <= self.max_snapshots {
394            return Ok(());
395        }
396
397        let to_remove = versions.len() - self.max_snapshots;
398        for version in versions.into_iter().take(to_remove) {
399            let filename = format!("snapshot_{version:08}.vamm");
400            let path = self.base_path.join(filename);
401            let _ = std::fs::remove_file(path);
402        }
403
404        Ok(())
405    }
406}
407
408/// Extracts a snapshot version number from a filename like `snapshot_00000001.vamm`.
409fn parse_snapshot_version(filename: &str) -> Option<u64> {
410    filename
411        .strip_prefix("snapshot_")
412        .and_then(|s| s.strip_suffix(".vamm"))
413        .and_then(|s| s.parse::<u64>().ok())
414}