velesdb_core/agent/
snapshot.rs1#![allow(clippy::cast_possible_truncation)]
30
31use std::fs::File;
32use std::io::{self, Read, Write};
33use std::path::Path;
34
35use crate::storage::snapshot::crc32_hash;
36
37pub const SNAPSHOT_MAGIC: &[u8; 4] = b"VAMM";
39
40pub const SNAPSHOT_VERSION: u8 = 1;
42
43#[derive(Debug, Clone, Default)]
45pub struct MemoryState {
46 pub semantic: Vec<u8>,
48 pub episodic: Vec<u8>,
50 pub procedural: Vec<u8>,
52 pub ttl: Vec<u8>,
54}
55
56#[derive(Debug, Clone)]
58pub struct SnapshotMetadata {
59 pub version: u8,
61 pub total_size: usize,
63 pub checksum: u32,
65}
66
67#[derive(Debug)]
69#[non_exhaustive]
70pub enum SnapshotError {
71 Io(io::Error),
73 InvalidMagic,
75 UnsupportedVersion(u8),
77 ChecksumMismatch {
79 expected: u32,
81 actual: u32,
83 },
84 CorruptedData(String),
86}
87
88impl std::fmt::Display for SnapshotError {
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 match self {
91 Self::Io(e) => write!(f, "IO error: {e}"),
92 Self::InvalidMagic => write!(f, "Invalid snapshot magic bytes"),
93 Self::UnsupportedVersion(v) => write!(f, "Unsupported snapshot version: {v}"),
94 Self::ChecksumMismatch { expected, actual } => {
95 write!(
96 f,
97 "Checksum mismatch: expected {expected:08x}, got {actual:08x}"
98 )
99 }
100 Self::CorruptedData(msg) => write!(f, "Corrupted data: {msg}"),
101 }
102 }
103}
104
105impl std::error::Error for SnapshotError {
106 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
107 match self {
108 Self::Io(e) => Some(e),
109 _ => None,
110 }
111 }
112}
113
114impl From<io::Error> for SnapshotError {
115 fn from(e: io::Error) -> Self {
116 Self::Io(e)
117 }
118}
119
120#[must_use]
130pub fn create_snapshot(state: &MemoryState) -> Vec<u8> {
131 let total_size = 4
132 + 1
133 + 8
134 + state.semantic.len()
135 + 8
136 + state.episodic.len()
137 + 8
138 + state.procedural.len()
139 + 8
140 + state.ttl.len()
141 + 4;
142 let mut buf = Vec::with_capacity(total_size);
143
144 buf.extend_from_slice(SNAPSHOT_MAGIC);
145 buf.push(SNAPSHOT_VERSION);
146
147 buf.extend_from_slice(&(state.semantic.len() as u64).to_le_bytes());
148 buf.extend_from_slice(&state.semantic);
149
150 buf.extend_from_slice(&(state.episodic.len() as u64).to_le_bytes());
151 buf.extend_from_slice(&state.episodic);
152
153 buf.extend_from_slice(&(state.procedural.len() as u64).to_le_bytes());
154 buf.extend_from_slice(&state.procedural);
155
156 buf.extend_from_slice(&(state.ttl.len() as u64).to_le_bytes());
157 buf.extend_from_slice(&state.ttl);
158
159 let crc = crc32_hash(&buf);
160 buf.extend_from_slice(&crc.to_le_bytes());
161
162 buf
163}
164
165pub fn load_snapshot(data: &[u8]) -> Result<MemoryState, SnapshotError> {
175 validate_snapshot_header(data)?;
176
177 let mut offset = 5; let payload_end = data.len() - 4; let semantic = read_section(data, &mut offset, payload_end, "Semantic")?;
181 let episodic = read_section(data, &mut offset, payload_end, "Episodic")?;
182 let procedural = read_section(data, &mut offset, payload_end, "Procedural")?;
183 let ttl = read_section(data, &mut offset, payload_end, "TTL")?;
184
185 Ok(MemoryState {
186 semantic,
187 episodic,
188 procedural,
189 ttl,
190 })
191}
192
193fn validate_snapshot_header(data: &[u8]) -> Result<(), SnapshotError> {
195 const MIN_SIZE: usize = 4 + 1 + 8 + 8 + 8 + 8 + 4;
196
197 if data.len() < MIN_SIZE {
198 return Err(SnapshotError::CorruptedData(
199 "Snapshot too small".to_string(),
200 ));
201 }
202 if &data[0..4] != SNAPSHOT_MAGIC {
203 return Err(SnapshotError::InvalidMagic);
204 }
205 let version = data[4];
206 if version != SNAPSHOT_VERSION {
207 return Err(SnapshotError::UnsupportedVersion(version));
208 }
209
210 let stored_crc = u32::from_le_bytes(
211 data[data.len() - 4..]
212 .try_into()
213 .map_err(|_| SnapshotError::CorruptedData("Invalid CRC bytes".to_string()))?,
214 );
215 let computed_crc = crc32_hash(&data[..data.len() - 4]);
216 if stored_crc != computed_crc {
217 return Err(SnapshotError::ChecksumMismatch {
218 expected: stored_crc,
219 actual: computed_crc,
220 });
221 }
222 Ok(())
223}
224
225fn read_section(
227 data: &[u8],
228 offset: &mut usize,
229 payload_end: usize,
230 label: &str,
231) -> Result<Vec<u8>, SnapshotError> {
232 let section_len = read_u64(&data[*offset..])? as usize;
233 *offset += 8;
234 if *offset + section_len > payload_end {
235 return Err(SnapshotError::CorruptedData(format!(
236 "{label} data truncated"
237 )));
238 }
239 let section = data[*offset..*offset + section_len].to_vec();
240 *offset += section_len;
241 Ok(section)
242}
243
244pub fn save_snapshot_to_file<P: AsRef<Path>>(
252 path: P,
253 state: &MemoryState,
254) -> Result<(), SnapshotError> {
255 let path = path.as_ref();
256 let snapshot_data = create_snapshot(state);
257
258 let temp_path = path.with_extension("tmp");
259 let mut file = File::create(&temp_path)?;
260 file.write_all(&snapshot_data)?;
261 file.sync_all()?;
262 drop(file);
263
264 std::fs::rename(&temp_path, path)?;
265
266 Ok(())
267}
268
269pub fn load_snapshot_from_file<P: AsRef<Path>>(path: P) -> Result<MemoryState, SnapshotError> {
275 let mut file = File::open(path)?;
276 let mut data = Vec::new();
277 file.read_to_end(&mut data)?;
278 load_snapshot(&data)
279}
280
281fn read_u64(data: &[u8]) -> Result<u64, SnapshotError> {
283 if data.len() < 8 {
284 return Err(SnapshotError::CorruptedData(
285 "Not enough bytes for u64".to_string(),
286 ));
287 }
288 Ok(u64::from_le_bytes(data[0..8].try_into().map_err(|_| {
289 SnapshotError::CorruptedData("Invalid u64 bytes".to_string())
290 })?))
291}
292
293pub struct SnapshotManager {
295 base_path: std::path::PathBuf,
297 max_snapshots: usize,
299}
300
301impl SnapshotManager {
302 pub fn new<P: AsRef<Path>>(base_path: P, max_snapshots: usize) -> Self {
309 Self {
310 base_path: base_path.as_ref().to_path_buf(),
311 max_snapshots,
312 }
313 }
314
315 pub fn create_versioned_snapshot(&self, state: &MemoryState) -> Result<u64, SnapshotError> {
325 std::fs::create_dir_all(&self.base_path)?;
326
327 let version = self.next_version()?;
328 let filename = format!("snapshot_{version:08}.vamm");
329 let path = self.base_path.join(filename);
330
331 save_snapshot_to_file(&path, state)?;
332 self.cleanup_old_snapshots()?;
333
334 Ok(version)
335 }
336
337 pub fn load_latest(&self) -> Result<(u64, MemoryState), SnapshotError> {
343 let version = self
344 .latest_version()?
345 .ok_or_else(|| SnapshotError::CorruptedData("No snapshots found".to_string()))?;
346 let state = self.load_version(version)?;
347 Ok((version, state))
348 }
349
350 pub fn load_version(&self, version: u64) -> Result<MemoryState, SnapshotError> {
356 let filename = format!("snapshot_{version:08}.vamm");
357 let path = self.base_path.join(filename);
358 load_snapshot_from_file(&path)
359 }
360
361 pub fn list_versions(&self) -> Result<Vec<u64>, SnapshotError> {
367 if !self.base_path.exists() {
368 return Ok(Vec::new());
369 }
370
371 let mut versions: Vec<u64> = std::fs::read_dir(&self.base_path)?
372 .filter_map(Result::ok)
373 .filter_map(|e| parse_snapshot_version(&e.file_name().to_string_lossy()))
374 .collect();
375
376 versions.sort_unstable();
377 Ok(versions)
378 }
379
380 fn latest_version(&self) -> Result<Option<u64>, SnapshotError> {
382 Ok(self.list_versions()?.into_iter().max())
383 }
384
385 fn next_version(&self) -> Result<u64, SnapshotError> {
387 Ok(self.latest_version()?.map_or(1, |v| v + 1))
388 }
389
390 fn cleanup_old_snapshots(&self) -> Result<(), SnapshotError> {
392 let versions = self.list_versions()?;
393 if versions.len() <= self.max_snapshots {
394 return Ok(());
395 }
396
397 let to_remove = versions.len() - self.max_snapshots;
398 for version in versions.into_iter().take(to_remove) {
399 let filename = format!("snapshot_{version:08}.vamm");
400 let path = self.base_path.join(filename);
401 let _ = std::fs::remove_file(path);
402 }
403
404 Ok(())
405 }
406}
407
408fn parse_snapshot_version(filename: &str) -> Option<u64> {
410 filename
411 .strip_prefix("snapshot_")
412 .and_then(|s| s.strip_suffix(".vamm"))
413 .and_then(|s| s.parse::<u64>().ok())
414}