1use std::io::{Read as IoRead, Write};
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15
16use crate::server::{ServerError, ServerResult};
17
18const MAGIC: &[u8; 8] = b"AMSNAP\x01\x00";
22const HEADER_LEN: usize = 40;
29
30const FNV_PRIME: u64 = 0x00000100_000001B3;
33const FNV_OFFSET_BASIS: u64 = 0xcbf29ce4_84222325;
34
35fn fnv64(data: &[u8]) -> u64 {
36 let mut hash = FNV_OFFSET_BASIS;
37 for byte in data {
38 hash = hash.wrapping_mul(FNV_PRIME);
39 hash ^= u64::from(*byte);
40 }
41 hash
42}
43
44#[derive(Debug, Clone, PartialEq, Eq)]
48pub struct SnapshotMeta {
49 pub id: u64,
51 pub timestamp_ms: u64,
53 pub size_bytes: u64,
55 pub checksum: u64,
57}
58
59pub struct SnapshotManager {
65 snapshot_dir: PathBuf,
66 uploader: Option<Arc<dyn SnapshotUploader>>,
67}
68
69impl SnapshotManager {
70 pub fn new(snapshot_dir: impl Into<PathBuf>) -> ServerResult<Self> {
74 let dir: PathBuf = snapshot_dir.into();
75 std::fs::create_dir_all(&dir)?;
76 Ok(Self {
77 snapshot_dir: dir,
78 uploader: None,
79 })
80 }
81
82 pub fn write_snapshot(&self, id: u64, data: &[u8]) -> ServerResult<SnapshotMeta> {
86 let compressed = oxiarc_lz4::compress(data)
87 .map_err(|e| ServerError::Storage(format!("Snapshot LZ4 compress failed: {}", e)))?;
88
89 let timestamp_ms = current_timestamp_ms();
90 let checksum = fnv64(&compressed);
91 let size_bytes = compressed.len() as u64;
92
93 let path = self.snapshot_path(id);
94 let mut file = std::fs::File::create(&path)?;
95
96 let mut header = [0u8; HEADER_LEN];
98 header[0..8].copy_from_slice(MAGIC);
99 header[8..16].copy_from_slice(&id.to_le_bytes());
100 header[16..24].copy_from_slice(×tamp_ms.to_le_bytes());
101 header[24..32].copy_from_slice(&(data.len() as u64).to_le_bytes());
102 header[32..40].copy_from_slice(&checksum.to_le_bytes());
103
104 file.write_all(&header)?;
105 file.write_all(&compressed)?;
106 file.sync_all()?;
107
108 Ok(SnapshotMeta {
109 id,
110 timestamp_ms,
111 size_bytes,
112 checksum,
113 })
114 }
115
116 pub fn read_snapshot(&self, id: u64) -> ServerResult<Vec<u8>> {
121 let path = self.snapshot_path(id);
122 let mut file = std::fs::File::open(&path)
123 .map_err(|e| ServerError::Storage(format!("Cannot open snapshot {}: {}", id, e)))?;
124
125 let mut header = [0u8; HEADER_LEN];
127 file.read_exact(&mut header).map_err(|e| {
128 ServerError::Storage(format!("Snapshot {} header read error: {}", id, e))
129 })?;
130
131 if &header[0..8] != MAGIC {
132 return Err(ServerError::Storage(format!(
133 "Snapshot {} has invalid magic bytes",
134 id
135 )));
136 }
137
138 let stored_id = u64::from_le_bytes(header[8..16].try_into().unwrap_or_default());
139 if stored_id != id {
140 return Err(ServerError::Storage(format!(
141 "Snapshot file id mismatch: expected {}, got {}",
142 id, stored_id
143 )));
144 }
145
146 let original_size =
147 u64::from_le_bytes(header[24..32].try_into().unwrap_or_default()) as usize;
148 let expected_checksum = u64::from_le_bytes(header[32..40].try_into().unwrap_or_default());
149
150 let mut compressed = Vec::new();
152 file.read_to_end(&mut compressed).map_err(|e| {
153 ServerError::Storage(format!("Snapshot {} payload read error: {}", id, e))
154 })?;
155
156 let actual_checksum = fnv64(&compressed);
158 if actual_checksum != expected_checksum {
159 return Err(ServerError::Storage(format!(
160 "Snapshot {} checksum mismatch: expected {:016x}, got {:016x}",
161 id, expected_checksum, actual_checksum
162 )));
163 }
164
165 let data = oxiarc_lz4::decompress(&compressed, original_size).map_err(|e| {
167 ServerError::Storage(format!("Snapshot {} LZ4 decompress failed: {}", id, e))
168 })?;
169
170 Ok(data)
171 }
172
173 pub fn list_snapshots(&self) -> ServerResult<Vec<SnapshotMeta>> {
175 let entries = std::fs::read_dir(&self.snapshot_dir)?;
176
177 let mut metas = Vec::new();
178 for entry in entries {
179 let entry = entry?;
180 let file_name = entry.file_name();
181 let name = file_name.to_string_lossy();
182
183 if !name.starts_with("snapshot-") || !name.ends_with(".bin") {
185 continue;
186 }
187
188 let meta = self.read_meta_from_path(&entry.path())?;
189 metas.push(meta);
190 }
191
192 metas.sort_by_key(|m| std::cmp::Reverse(m.id));
194 Ok(metas)
195 }
196
197 pub fn delete_snapshot(&self, id: u64) -> ServerResult<()> {
201 let path = self.snapshot_path(id);
202 std::fs::remove_file(&path)
203 .map_err(|e| ServerError::Storage(format!("Failed to delete snapshot {}: {}", id, e)))
204 }
205
206 fn snapshot_path(&self, id: u64) -> PathBuf {
208 self.snapshot_dir.join(format!("snapshot-{:020}.bin", id))
209 }
210
211 fn read_meta_from_path(&self, path: &Path) -> ServerResult<SnapshotMeta> {
214 let mut file = std::fs::File::open(path).map_err(|e| {
215 ServerError::Storage(format!(
216 "Cannot open snapshot file {}: {}",
217 path.display(),
218 e
219 ))
220 })?;
221
222 let mut header = [0u8; HEADER_LEN];
223 file.read_exact(&mut header).map_err(|e| {
224 ServerError::Storage(format!("Cannot read header from {}: {}", path.display(), e))
225 })?;
226
227 if &header[0..8] != MAGIC {
228 return Err(ServerError::Storage(format!(
229 "Invalid magic in {}",
230 path.display()
231 )));
232 }
233
234 let id = u64::from_le_bytes(header[8..16].try_into().unwrap_or_default());
235 let timestamp_ms = u64::from_le_bytes(header[16..24].try_into().unwrap_or_default());
236 let checksum = u64::from_le_bytes(header[32..40].try_into().unwrap_or_default());
237
238 let file_len = std::fs::metadata(path)?.len();
240 let size_bytes = file_len.saturating_sub(HEADER_LEN as u64);
241
242 Ok(SnapshotMeta {
243 id,
244 timestamp_ms,
245 size_bytes,
246 checksum,
247 })
248 }
249}
250
251fn current_timestamp_ms() -> u64 {
253 std::time::SystemTime::now()
254 .duration_since(std::time::UNIX_EPOCH)
255 .map(|d| d.as_millis() as u64)
256 .unwrap_or(0)
257}
258
259pub trait SnapshotUploader: Send + Sync {
267 fn upload_snapshot(&self, id: u64, data: &[u8]) -> ServerResult<String>;
269 fn download_snapshot(&self, uri: &str) -> ServerResult<Vec<u8>>;
271 fn list_remote_snapshots(&self) -> ServerResult<Vec<(u64, String)>>;
273}
274
275pub struct LocalSnapshotUploader {
279 base_path: PathBuf,
280}
281
282impl LocalSnapshotUploader {
283 pub fn new(base_path: impl Into<PathBuf>) -> ServerResult<Self> {
287 let base_path = base_path.into();
288 std::fs::create_dir_all(&base_path)?;
289 Ok(Self { base_path })
290 }
291
292 fn parse_id_from_name(name: &str) -> Option<u64> {
295 let stem = name
296 .strip_prefix("remote-snapshot-")?
297 .strip_suffix(".bin")?;
298 stem.parse::<u64>().ok()
299 }
300}
301
302impl SnapshotUploader for LocalSnapshotUploader {
303 fn upload_snapshot(&self, id: u64, data: &[u8]) -> ServerResult<String> {
304 let path = self
305 .base_path
306 .join(format!("remote-snapshot-{:020}.bin", id));
307 std::fs::write(&path, data)?;
308 Ok(format!("local://{}", path.display()))
309 }
310
311 fn download_snapshot(&self, uri: &str) -> ServerResult<Vec<u8>> {
312 let path_str = uri
313 .strip_prefix("local://")
314 .ok_or_else(|| ServerError::Storage(format!("Unsupported URI scheme in '{}'", uri)))?;
315 std::fs::read(path_str)
316 .map_err(|e| ServerError::Storage(format!("Failed to download '{}': {}", uri, e)))
317 }
318
319 fn list_remote_snapshots(&self) -> ServerResult<Vec<(u64, String)>> {
320 let entries = std::fs::read_dir(&self.base_path)?;
321 let mut snapshots = Vec::new();
322 for entry in entries {
323 let entry = entry?;
324 let file_name = entry.file_name();
325 let name = file_name.to_string_lossy();
326 if let Some(id) = Self::parse_id_from_name(&name) {
327 let uri = format!("local://{}", entry.path().display());
328 snapshots.push((id, uri));
329 }
330 }
331 snapshots.sort_by_key(|(id, _)| *id);
332 Ok(snapshots)
333 }
334}
335
336impl SnapshotManager {
337 pub fn set_uploader(&mut self, uploader: Arc<dyn SnapshotUploader>) {
339 self.uploader = Some(uploader);
340 }
341
342 pub fn upload(&self, id: u64) -> ServerResult<String> {
347 let uploader = self.uploader.as_ref().ok_or_else(|| {
348 ServerError::Storage("No uploader configured on SnapshotManager".to_string())
349 })?;
350 let data = self.read_snapshot(id)?;
351 uploader.upload_snapshot(id, &data)
352 }
353
354 pub fn restore_from_remote(&self, uri: &str, local_id: u64) -> ServerResult<SnapshotMeta> {
358 let uploader = self.uploader.as_ref().ok_or_else(|| {
359 ServerError::Storage("No uploader configured on SnapshotManager".to_string())
360 })?;
361 let data = uploader.download_snapshot(uri)?;
362 self.write_snapshot(local_id, &data)
363 }
364}
365
366#[cfg(test)]
369mod tests {
370 use super::*;
371
372 fn temp_dir(suffix: &str) -> PathBuf {
373 let dir = std::env::temp_dir().join(format!("amaters_snapshot_test_{}", suffix));
374 std::fs::create_dir_all(&dir).expect("create temp dir");
375 dir
376 }
377
378 #[test]
379 fn test_write_and_read_snapshot() {
380 let dir = temp_dir("write_read");
381 let sm = SnapshotManager::new(&dir).expect("create manager");
382
383 let payload: Vec<u8> = (0u8..=255).cycle().take(1024 * 1024).collect();
385 let meta = sm.write_snapshot(42, &payload).expect("write");
386 assert_eq!(meta.id, 42);
387 assert!(meta.size_bytes > 0);
388
389 let restored = sm.read_snapshot(42).expect("read");
390 assert_eq!(restored, payload);
391
392 std::fs::remove_dir_all(&dir).ok();
393 }
394
395 #[test]
396 fn test_list_snapshots_sorted() {
397 let dir = temp_dir("list_sorted");
398 let sm = SnapshotManager::new(&dir).expect("create manager");
399
400 sm.write_snapshot(10, b"alpha").expect("write 10");
401 sm.write_snapshot(30, b"gamma").expect("write 30");
402 sm.write_snapshot(20, b"beta").expect("write 20");
403
404 let list = sm.list_snapshots().expect("list");
405 assert_eq!(list.len(), 3);
406 assert_eq!(list[0].id, 30);
408 assert_eq!(list[1].id, 20);
409 assert_eq!(list[2].id, 10);
410
411 std::fs::remove_dir_all(&dir).ok();
412 }
413
414 #[test]
415 fn test_delete_snapshot() {
416 let dir = temp_dir("delete");
417 let sm = SnapshotManager::new(&dir).expect("create manager");
418
419 sm.write_snapshot(1, b"to be deleted").expect("write");
420 sm.write_snapshot(2, b"to be kept").expect("write");
421
422 sm.delete_snapshot(1).expect("delete");
423
424 let list = sm.list_snapshots().expect("list");
425 assert_eq!(list.len(), 1);
426 assert_eq!(list[0].id, 2);
427
428 assert!(sm.read_snapshot(1).is_err());
430
431 std::fs::remove_dir_all(&dir).ok();
432 }
433
434 #[test]
435 fn test_snapshot_checksum_corruption_detected() {
436 let dir = temp_dir("checksum");
437 let sm = SnapshotManager::new(&dir).expect("create manager");
438
439 sm.write_snapshot(99, b"important data").expect("write");
440
441 let path = dir.join("snapshot-00000000000000000099.bin");
443 let mut contents = std::fs::read(&path).expect("read file");
444 if contents.len() > HEADER_LEN + 4 {
446 contents[HEADER_LEN + 4] ^= 0xFF;
447 }
448 std::fs::write(&path, &contents).expect("write corrupted file");
449
450 let result = sm.read_snapshot(99);
451 assert!(
452 result.is_err(),
453 "Should detect checksum mismatch after corruption"
454 );
455
456 std::fs::remove_dir_all(&dir).ok();
457 }
458
459 #[test]
460 fn test_fnv64_deterministic() {
461 let a = fnv64(b"hello");
462 let b = fnv64(b"hello");
463 assert_eq!(a, b);
464 assert_ne!(fnv64(b"hello"), fnv64(b"world"));
465 }
466
467 #[test]
468 fn test_local_uploader_round_trip() {
469 let local_dir = temp_dir("uploader_local");
470 let remote_dir = temp_dir("uploader_remote");
471
472 let mut sm = SnapshotManager::new(&local_dir).expect("create manager");
473 let uploader = LocalSnapshotUploader::new(&remote_dir).expect("create uploader");
474 sm.set_uploader(std::sync::Arc::new(uploader));
475
476 let payload = b"round-trip test data 1234567890";
477 sm.write_snapshot(777, payload)
478 .expect("write local snapshot");
479
480 let uri = sm.upload(777).expect("upload");
481 assert!(uri.starts_with("local://"), "URI: {}", uri);
482
483 let meta = sm
485 .restore_from_remote(&uri, 888)
486 .expect("restore from remote");
487 assert_eq!(meta.id, 888);
488
489 let restored = sm.read_snapshot(888).expect("read restored");
490 assert_eq!(restored.as_slice(), payload.as_ref());
491
492 std::fs::remove_dir_all(&local_dir).ok();
493 std::fs::remove_dir_all(&remote_dir).ok();
494 }
495
496 #[test]
497 fn test_upload_requires_uploader_set() {
498 let dir = temp_dir("no_uploader");
499 let sm = SnapshotManager::new(&dir).expect("create manager");
500 sm.write_snapshot(1, b"data").expect("write");
501 let result = sm.upload(1);
502 assert!(result.is_err(), "upload without uploader should fail");
503 std::fs::remove_dir_all(&dir).ok();
504 }
505}