Skip to main content

amaters_server/
snapshot.rs

1//! Snapshot creation and restore for server-level state.
2//!
3//! Snapshots are compressed with LZ4 (via oxiarc-lz4) and stored as flat
4//! binary files under a configurable directory.  Each snapshot file is named
5//! `snapshot-{id:020}.bin` so that lexicographic directory listing already
6//! yields the snapshots in chronological order.
7//!
8//! A fast FNV-64 checksum is embedded in the header of every file so that a
9//! corrupted file can be detected at read time before the decompressed bytes
10//! are handed to the caller.
11
12use std::io::{Read as IoRead, Write};
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15
16use crate::server::{ServerError, ServerResult};
17
18// ─── Header constants ─────────────────────────────────────────────────────────
19
20/// Magic bytes written at the start of every snapshot file.
21const MAGIC: &[u8; 8] = b"AMSNAP\x01\x00";
22/// Fixed header length in bytes:
23/// - 8 B  magic
24/// - 8 B  snapshot id (little-endian u64)
25/// - 8 B  timestamp_ms (little-endian u64)
26/// - 8 B  original (uncompressed) size (little-endian u64)
27/// - 8 B  FNV-64 checksum of the *compressed* payload
28const HEADER_LEN: usize = 40;
29
30// ─── FNV-64 ──────────────────────────────────────────────────────────────────
31
32const FNV_PRIME: u64 = 0x00000100_000001B3;
33const FNV_OFFSET_BASIS: u64 = 0xcbf29ce4_84222325;
34
35fn fnv64(data: &[u8]) -> u64 {
36    let mut hash = FNV_OFFSET_BASIS;
37    for byte in data {
38        hash = hash.wrapping_mul(FNV_PRIME);
39        hash ^= u64::from(*byte);
40    }
41    hash
42}
43
44// ─── Public types ─────────────────────────────────────────────────────────────
45
46/// Metadata describing a snapshot stored on disk.
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub struct SnapshotMeta {
49    /// Monotonically increasing identifier (caller-assigned).
50    pub id: u64,
51    /// Unix timestamp in milliseconds at which the snapshot was written.
52    pub timestamp_ms: u64,
53    /// Size of the *compressed* bytes stored in the file (excluding header).
54    pub size_bytes: u64,
55    /// FNV-64 checksum of the compressed payload.
56    pub checksum: u64,
57}
58
59/// Manages a directory of numbered snapshot files.
60///
61/// All operations are synchronous (no async I/O) to keep things simple and
62/// testable.  In production the caller is responsible for spawning these calls
63/// onto a blocking thread via `tokio::task::spawn_blocking`.
64pub struct SnapshotManager {
65    snapshot_dir: PathBuf,
66    uploader: Option<Arc<dyn SnapshotUploader>>,
67}
68
69impl SnapshotManager {
70    /// Create a [`SnapshotManager`] that stores files in `snapshot_dir`.
71    ///
72    /// The directory is created if it does not yet exist.
73    pub fn new(snapshot_dir: impl Into<PathBuf>) -> ServerResult<Self> {
74        let dir: PathBuf = snapshot_dir.into();
75        std::fs::create_dir_all(&dir)?;
76        Ok(Self {
77            snapshot_dir: dir,
78            uploader: None,
79        })
80    }
81
82    /// Compress `data` with LZ4 and write it to disk as snapshot `id`.
83    ///
84    /// Returns [`SnapshotMeta`] describing the written file.
85    pub fn write_snapshot(&self, id: u64, data: &[u8]) -> ServerResult<SnapshotMeta> {
86        let compressed = oxiarc_lz4::compress(data)
87            .map_err(|e| ServerError::Storage(format!("Snapshot LZ4 compress failed: {}", e)))?;
88
89        let timestamp_ms = current_timestamp_ms();
90        let checksum = fnv64(&compressed);
91        let size_bytes = compressed.len() as u64;
92
93        let path = self.snapshot_path(id);
94        let mut file = std::fs::File::create(&path)?;
95
96        // Write header
97        let mut header = [0u8; HEADER_LEN];
98        header[0..8].copy_from_slice(MAGIC);
99        header[8..16].copy_from_slice(&id.to_le_bytes());
100        header[16..24].copy_from_slice(&timestamp_ms.to_le_bytes());
101        header[24..32].copy_from_slice(&(data.len() as u64).to_le_bytes());
102        header[32..40].copy_from_slice(&checksum.to_le_bytes());
103
104        file.write_all(&header)?;
105        file.write_all(&compressed)?;
106        file.sync_all()?;
107
108        Ok(SnapshotMeta {
109            id,
110            timestamp_ms,
111            size_bytes,
112            checksum,
113        })
114    }
115
116    /// Read back the snapshot with the given `id` and decompress it.
117    ///
118    /// Returns an error if the file is missing, the header is corrupted, the
119    /// checksum does not match, or LZ4 decompression fails.
120    pub fn read_snapshot(&self, id: u64) -> ServerResult<Vec<u8>> {
121        let path = self.snapshot_path(id);
122        let mut file = std::fs::File::open(&path)
123            .map_err(|e| ServerError::Storage(format!("Cannot open snapshot {}: {}", id, e)))?;
124
125        // Read and validate header
126        let mut header = [0u8; HEADER_LEN];
127        file.read_exact(&mut header).map_err(|e| {
128            ServerError::Storage(format!("Snapshot {} header read error: {}", id, e))
129        })?;
130
131        if &header[0..8] != MAGIC {
132            return Err(ServerError::Storage(format!(
133                "Snapshot {} has invalid magic bytes",
134                id
135            )));
136        }
137
138        let stored_id = u64::from_le_bytes(header[8..16].try_into().unwrap_or_default());
139        if stored_id != id {
140            return Err(ServerError::Storage(format!(
141                "Snapshot file id mismatch: expected {}, got {}",
142                id, stored_id
143            )));
144        }
145
146        let original_size =
147            u64::from_le_bytes(header[24..32].try_into().unwrap_or_default()) as usize;
148        let expected_checksum = u64::from_le_bytes(header[32..40].try_into().unwrap_or_default());
149
150        // Read compressed payload
151        let mut compressed = Vec::new();
152        file.read_to_end(&mut compressed).map_err(|e| {
153            ServerError::Storage(format!("Snapshot {} payload read error: {}", id, e))
154        })?;
155
156        // Verify checksum before decompressing
157        let actual_checksum = fnv64(&compressed);
158        if actual_checksum != expected_checksum {
159            return Err(ServerError::Storage(format!(
160                "Snapshot {} checksum mismatch: expected {:016x}, got {:016x}",
161                id, expected_checksum, actual_checksum
162            )));
163        }
164
165        // Decompress
166        let data = oxiarc_lz4::decompress(&compressed, original_size).map_err(|e| {
167            ServerError::Storage(format!("Snapshot {} LZ4 decompress failed: {}", id, e))
168        })?;
169
170        Ok(data)
171    }
172
173    /// List all available snapshots, sorted by ID descending (newest first).
174    pub fn list_snapshots(&self) -> ServerResult<Vec<SnapshotMeta>> {
175        let entries = std::fs::read_dir(&self.snapshot_dir)?;
176
177        let mut metas = Vec::new();
178        for entry in entries {
179            let entry = entry?;
180            let file_name = entry.file_name();
181            let name = file_name.to_string_lossy();
182
183            // Only process files matching our naming convention
184            if !name.starts_with("snapshot-") || !name.ends_with(".bin") {
185                continue;
186            }
187
188            let meta = self.read_meta_from_path(&entry.path())?;
189            metas.push(meta);
190        }
191
192        // Sort descending by id (newest first)
193        metas.sort_by_key(|m| std::cmp::Reverse(m.id));
194        Ok(metas)
195    }
196
197    /// Delete the snapshot file for `id`.
198    ///
199    /// Returns an error if the file does not exist.
200    pub fn delete_snapshot(&self, id: u64) -> ServerResult<()> {
201        let path = self.snapshot_path(id);
202        std::fs::remove_file(&path)
203            .map_err(|e| ServerError::Storage(format!("Failed to delete snapshot {}: {}", id, e)))
204    }
205
206    /// Build the canonical path for a given snapshot `id`.
207    fn snapshot_path(&self, id: u64) -> PathBuf {
208        self.snapshot_dir.join(format!("snapshot-{:020}.bin", id))
209    }
210
211    /// Read only the header bytes from an on-disk file to populate
212    /// [`SnapshotMeta`] without decompressing the payload.
213    fn read_meta_from_path(&self, path: &Path) -> ServerResult<SnapshotMeta> {
214        let mut file = std::fs::File::open(path).map_err(|e| {
215            ServerError::Storage(format!(
216                "Cannot open snapshot file {}: {}",
217                path.display(),
218                e
219            ))
220        })?;
221
222        let mut header = [0u8; HEADER_LEN];
223        file.read_exact(&mut header).map_err(|e| {
224            ServerError::Storage(format!("Cannot read header from {}: {}", path.display(), e))
225        })?;
226
227        if &header[0..8] != MAGIC {
228            return Err(ServerError::Storage(format!(
229                "Invalid magic in {}",
230                path.display()
231            )));
232        }
233
234        let id = u64::from_le_bytes(header[8..16].try_into().unwrap_or_default());
235        let timestamp_ms = u64::from_le_bytes(header[16..24].try_into().unwrap_or_default());
236        let checksum = u64::from_le_bytes(header[32..40].try_into().unwrap_or_default());
237
238        // Size = file size - header length
239        let file_len = std::fs::metadata(path)?.len();
240        let size_bytes = file_len.saturating_sub(HEADER_LEN as u64);
241
242        Ok(SnapshotMeta {
243            id,
244            timestamp_ms,
245            size_bytes,
246            checksum,
247        })
248    }
249}
250
251/// Returns current Unix time in milliseconds, falling back to 0 on error.
252fn current_timestamp_ms() -> u64 {
253    std::time::SystemTime::now()
254        .duration_since(std::time::UNIX_EPOCH)
255        .map(|d| d.as_millis() as u64)
256        .unwrap_or(0)
257}
258
259// ─── Remote snapshot upload trait ────────────────────────────────────────────
260
261/// Trait for uploading snapshots to remote object storage.
262///
263/// The `LocalSnapshotUploader` is a reference implementation suitable for
264/// testing and single-node deployments.  An S3-backed implementation would
265/// live behind a feature flag and satisfy the same interface.
266pub trait SnapshotUploader: Send + Sync {
267    /// Upload raw `data` as snapshot `id` and return the remote URI.
268    fn upload_snapshot(&self, id: u64, data: &[u8]) -> ServerResult<String>;
269    /// Download a snapshot previously uploaded to `uri`.
270    fn download_snapshot(&self, uri: &str) -> ServerResult<Vec<u8>>;
271    /// List all snapshots in remote storage, sorted by id ascending.
272    fn list_remote_snapshots(&self) -> ServerResult<Vec<(u64, String)>>;
273}
274
275/// Local filesystem implementation of [`SnapshotUploader`].
276///
277/// URIs produced by this uploader have the form `local://<absolute_path>`.
278pub struct LocalSnapshotUploader {
279    base_path: PathBuf,
280}
281
282impl LocalSnapshotUploader {
283    /// Create a new uploader that stores remote snapshots under `base_path`.
284    ///
285    /// The directory is created if it does not already exist.
286    pub fn new(base_path: impl Into<PathBuf>) -> ServerResult<Self> {
287        let base_path = base_path.into();
288        std::fs::create_dir_all(&base_path)?;
289        Ok(Self { base_path })
290    }
291
292    /// Parse the snapshot id from a remote filename such as
293    /// `remote-snapshot-{id:020}.bin`.
294    fn parse_id_from_name(name: &str) -> Option<u64> {
295        let stem = name
296            .strip_prefix("remote-snapshot-")?
297            .strip_suffix(".bin")?;
298        stem.parse::<u64>().ok()
299    }
300}
301
302impl SnapshotUploader for LocalSnapshotUploader {
303    fn upload_snapshot(&self, id: u64, data: &[u8]) -> ServerResult<String> {
304        let path = self
305            .base_path
306            .join(format!("remote-snapshot-{:020}.bin", id));
307        std::fs::write(&path, data)?;
308        Ok(format!("local://{}", path.display()))
309    }
310
311    fn download_snapshot(&self, uri: &str) -> ServerResult<Vec<u8>> {
312        let path_str = uri
313            .strip_prefix("local://")
314            .ok_or_else(|| ServerError::Storage(format!("Unsupported URI scheme in '{}'", uri)))?;
315        std::fs::read(path_str)
316            .map_err(|e| ServerError::Storage(format!("Failed to download '{}': {}", uri, e)))
317    }
318
319    fn list_remote_snapshots(&self) -> ServerResult<Vec<(u64, String)>> {
320        let entries = std::fs::read_dir(&self.base_path)?;
321        let mut snapshots = Vec::new();
322        for entry in entries {
323            let entry = entry?;
324            let file_name = entry.file_name();
325            let name = file_name.to_string_lossy();
326            if let Some(id) = Self::parse_id_from_name(&name) {
327                let uri = format!("local://{}", entry.path().display());
328                snapshots.push((id, uri));
329            }
330        }
331        snapshots.sort_by_key(|(id, _)| *id);
332        Ok(snapshots)
333    }
334}
335
336impl SnapshotManager {
337    /// Attach an uploader to this manager.
338    pub fn set_uploader(&mut self, uploader: Arc<dyn SnapshotUploader>) {
339        self.uploader = Some(uploader);
340    }
341
342    /// Upload the local snapshot with `id` via the configured uploader.
343    ///
344    /// Returns the remote URI on success, or an error if no uploader is set
345    /// or the snapshot does not exist locally.
346    pub fn upload(&self, id: u64) -> ServerResult<String> {
347        let uploader = self.uploader.as_ref().ok_or_else(|| {
348            ServerError::Storage("No uploader configured on SnapshotManager".to_string())
349        })?;
350        let data = self.read_snapshot(id)?;
351        uploader.upload_snapshot(id, &data)
352    }
353
354    /// Download a snapshot from `uri` and store it locally under `local_id`.
355    ///
356    /// Returns the [`SnapshotMeta`] of the newly written local snapshot.
357    pub fn restore_from_remote(&self, uri: &str, local_id: u64) -> ServerResult<SnapshotMeta> {
358        let uploader = self.uploader.as_ref().ok_or_else(|| {
359            ServerError::Storage("No uploader configured on SnapshotManager".to_string())
360        })?;
361        let data = uploader.download_snapshot(uri)?;
362        self.write_snapshot(local_id, &data)
363    }
364}
365
366// ─── Unit tests ──────────────────────────────────────────────────────────────
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371
372    fn temp_dir(suffix: &str) -> PathBuf {
373        let dir = std::env::temp_dir().join(format!("amaters_snapshot_test_{}", suffix));
374        std::fs::create_dir_all(&dir).expect("create temp dir");
375        dir
376    }
377
378    #[test]
379    fn test_write_and_read_snapshot() {
380        let dir = temp_dir("write_read");
381        let sm = SnapshotManager::new(&dir).expect("create manager");
382
383        // 1 MiB of test data
384        let payload: Vec<u8> = (0u8..=255).cycle().take(1024 * 1024).collect();
385        let meta = sm.write_snapshot(42, &payload).expect("write");
386        assert_eq!(meta.id, 42);
387        assert!(meta.size_bytes > 0);
388
389        let restored = sm.read_snapshot(42).expect("read");
390        assert_eq!(restored, payload);
391
392        std::fs::remove_dir_all(&dir).ok();
393    }
394
395    #[test]
396    fn test_list_snapshots_sorted() {
397        let dir = temp_dir("list_sorted");
398        let sm = SnapshotManager::new(&dir).expect("create manager");
399
400        sm.write_snapshot(10, b"alpha").expect("write 10");
401        sm.write_snapshot(30, b"gamma").expect("write 30");
402        sm.write_snapshot(20, b"beta").expect("write 20");
403
404        let list = sm.list_snapshots().expect("list");
405        assert_eq!(list.len(), 3);
406        // Newest (highest id) first
407        assert_eq!(list[0].id, 30);
408        assert_eq!(list[1].id, 20);
409        assert_eq!(list[2].id, 10);
410
411        std::fs::remove_dir_all(&dir).ok();
412    }
413
414    #[test]
415    fn test_delete_snapshot() {
416        let dir = temp_dir("delete");
417        let sm = SnapshotManager::new(&dir).expect("create manager");
418
419        sm.write_snapshot(1, b"to be deleted").expect("write");
420        sm.write_snapshot(2, b"to be kept").expect("write");
421
422        sm.delete_snapshot(1).expect("delete");
423
424        let list = sm.list_snapshots().expect("list");
425        assert_eq!(list.len(), 1);
426        assert_eq!(list[0].id, 2);
427
428        // Attempting to read the deleted snapshot should fail
429        assert!(sm.read_snapshot(1).is_err());
430
431        std::fs::remove_dir_all(&dir).ok();
432    }
433
434    #[test]
435    fn test_snapshot_checksum_corruption_detected() {
436        let dir = temp_dir("checksum");
437        let sm = SnapshotManager::new(&dir).expect("create manager");
438
439        sm.write_snapshot(99, b"important data").expect("write");
440
441        // Corrupt the payload bytes of the snapshot file
442        let path = dir.join("snapshot-00000000000000000099.bin");
443        let mut contents = std::fs::read(&path).expect("read file");
444        // Flip a byte well past the header
445        if contents.len() > HEADER_LEN + 4 {
446            contents[HEADER_LEN + 4] ^= 0xFF;
447        }
448        std::fs::write(&path, &contents).expect("write corrupted file");
449
450        let result = sm.read_snapshot(99);
451        assert!(
452            result.is_err(),
453            "Should detect checksum mismatch after corruption"
454        );
455
456        std::fs::remove_dir_all(&dir).ok();
457    }
458
459    #[test]
460    fn test_fnv64_deterministic() {
461        let a = fnv64(b"hello");
462        let b = fnv64(b"hello");
463        assert_eq!(a, b);
464        assert_ne!(fnv64(b"hello"), fnv64(b"world"));
465    }
466
467    #[test]
468    fn test_local_uploader_round_trip() {
469        let local_dir = temp_dir("uploader_local");
470        let remote_dir = temp_dir("uploader_remote");
471
472        let mut sm = SnapshotManager::new(&local_dir).expect("create manager");
473        let uploader = LocalSnapshotUploader::new(&remote_dir).expect("create uploader");
474        sm.set_uploader(std::sync::Arc::new(uploader));
475
476        let payload = b"round-trip test data 1234567890";
477        sm.write_snapshot(777, payload)
478            .expect("write local snapshot");
479
480        let uri = sm.upload(777).expect("upload");
481        assert!(uri.starts_with("local://"), "URI: {}", uri);
482
483        // Restore into a fresh manager pointing at same local dir
484        let meta = sm
485            .restore_from_remote(&uri, 888)
486            .expect("restore from remote");
487        assert_eq!(meta.id, 888);
488
489        let restored = sm.read_snapshot(888).expect("read restored");
490        assert_eq!(restored.as_slice(), payload.as_ref());
491
492        std::fs::remove_dir_all(&local_dir).ok();
493        std::fs::remove_dir_all(&remote_dir).ok();
494    }
495
496    #[test]
497    fn test_upload_requires_uploader_set() {
498        let dir = temp_dir("no_uploader");
499        let sm = SnapshotManager::new(&dir).expect("create manager");
500        sm.write_snapshot(1, b"data").expect("write");
501        let result = sm.upload(1);
502        assert!(result.is_err(), "upload without uploader should fail");
503        std::fs::remove_dir_all(&dir).ok();
504    }
505}