1use common::{DakeraError, NamespaceId, Result, Vector};
10use serde::{Deserialize, Serialize};
11use std::fs::{self, File};
12use std::io::{BufReader, BufWriter};
13use std::path::{Path, PathBuf};
14use std::time::{SystemTime, UNIX_EPOCH};
15
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use crate::traits::VectorStorage;
19
20static SNAPSHOT_COUNTER: AtomicU64 = AtomicU64::new(0);
22
23#[derive(Debug, Clone)]
25pub struct SnapshotConfig {
26 pub snapshot_dir: PathBuf,
28 pub max_snapshots: usize,
30 pub compression_enabled: bool,
32 pub include_metadata: bool,
34}
35
36impl Default for SnapshotConfig {
37 fn default() -> Self {
38 Self {
39 snapshot_dir: PathBuf::from("./data/snapshots"),
40 max_snapshots: 10,
41 compression_enabled: true,
42 include_metadata: true,
43 }
44 }
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct SnapshotMetadata {
50 pub id: String,
52 pub created_at: u64,
54 pub description: Option<String>,
56 pub namespaces: Vec<String>,
58 pub total_vectors: u64,
60 pub size_bytes: u64,
62 pub snapshot_type: SnapshotType,
64 pub parent_id: Option<String>,
66 pub version: String,
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
72pub enum SnapshotType {
73 Full,
75 Incremental,
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct NamespaceSnapshot {
82 pub namespace: String,
84 pub vector_count: usize,
86 pub dimension: Option<usize>,
88 pub vectors: Vec<SerializedVector>,
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct SerializedVector {
95 pub id: String,
96 pub values: Vec<f32>,
97 pub metadata: Option<serde_json::Value>,
98}
99
100impl From<&Vector> for SerializedVector {
101 fn from(v: &Vector) -> Self {
102 Self {
103 id: v.id.clone(),
104 values: v.values.clone(),
105 metadata: v.metadata.clone(),
106 }
107 }
108}
109
110impl From<SerializedVector> for Vector {
111 fn from(sv: SerializedVector) -> Self {
112 Vector {
113 id: sv.id,
114 values: sv.values,
115 metadata: sv.metadata,
116 ttl_seconds: None,
117 expires_at: None,
118 }
119 }
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct SnapshotData {
125 pub metadata: SnapshotMetadata,
127 pub namespaces: Vec<NamespaceSnapshot>,
129}
130
131pub struct SnapshotManager {
133 config: SnapshotConfig,
134}
135
136impl SnapshotManager {
137 pub fn new(config: SnapshotConfig) -> Result<Self> {
139 fs::create_dir_all(&config.snapshot_dir)
141 .map_err(|e| DakeraError::Storage(format!("Failed to create snapshot dir: {}", e)))?;
142
143 Ok(Self { config })
144 }
145
146 pub async fn create_snapshot<S: VectorStorage>(
148 &self,
149 storage: &S,
150 description: Option<String>,
151 ) -> Result<SnapshotMetadata> {
152 let snapshot_id = self.generate_snapshot_id();
153 let created_at = SystemTime::now()
154 .duration_since(UNIX_EPOCH)
155 .expect("system clock is before UNIX epoch")
156 .as_secs();
157
158 let namespaces = storage.list_namespaces().await?;
160
161 let mut namespace_snapshots = Vec::new();
162 let mut total_vectors = 0u64;
163
164 for namespace in &namespaces {
165 let vectors = storage.get_all(namespace).await?;
166 let dimension = storage.dimension(namespace).await?;
167
168 total_vectors += vectors.len() as u64;
169
170 let serialized: Vec<SerializedVector> =
171 vectors.iter().map(SerializedVector::from).collect();
172
173 namespace_snapshots.push(NamespaceSnapshot {
174 namespace: namespace.clone(),
175 vector_count: serialized.len(),
176 dimension,
177 vectors: serialized,
178 });
179 }
180
181 let metadata = SnapshotMetadata {
182 id: snapshot_id.clone(),
183 created_at,
184 description,
185 namespaces: namespaces.clone(),
186 total_vectors,
187 size_bytes: 0, snapshot_type: SnapshotType::Full,
189 parent_id: None,
190 version: "1.0.0".to_string(),
191 };
192
193 let snapshot_data = SnapshotData {
194 metadata: metadata.clone(),
195 namespaces: namespace_snapshots,
196 };
197
198 let size_bytes = self.save_snapshot(&snapshot_id, &snapshot_data)?;
200
201 let mut final_metadata = metadata;
203 final_metadata.size_bytes = size_bytes;
204
205 self.save_metadata(&snapshot_id, &final_metadata)?;
207
208 self.cleanup_old_snapshots()?;
210
211 Ok(final_metadata)
212 }
213
214 pub async fn create_incremental_snapshot<S: VectorStorage>(
216 &self,
217 storage: &S,
218 parent_id: &str,
219 changed_namespaces: &[NamespaceId],
220 description: Option<String>,
221 ) -> Result<SnapshotMetadata> {
222 if !self.snapshot_exists(parent_id) {
224 return Err(DakeraError::Storage(format!(
225 "Parent snapshot not found: {}",
226 parent_id
227 )));
228 }
229
230 let snapshot_id = self.generate_snapshot_id();
231 let created_at = SystemTime::now()
232 .duration_since(UNIX_EPOCH)
233 .expect("system clock is before UNIX epoch")
234 .as_secs();
235
236 let mut namespace_snapshots = Vec::new();
237 let mut total_vectors = 0u64;
238
239 for namespace in changed_namespaces {
241 let vectors = storage.get_all(namespace).await?;
242 let dimension = storage.dimension(namespace).await?;
243
244 total_vectors += vectors.len() as u64;
245
246 let serialized: Vec<SerializedVector> =
247 vectors.iter().map(SerializedVector::from).collect();
248
249 namespace_snapshots.push(NamespaceSnapshot {
250 namespace: namespace.clone(),
251 vector_count: serialized.len(),
252 dimension,
253 vectors: serialized,
254 });
255 }
256
257 let metadata = SnapshotMetadata {
258 id: snapshot_id.clone(),
259 created_at,
260 description,
261 namespaces: changed_namespaces.to_vec(),
262 total_vectors,
263 size_bytes: 0,
264 snapshot_type: SnapshotType::Incremental,
265 parent_id: Some(parent_id.to_string()),
266 version: "1.0.0".to_string(),
267 };
268
269 let snapshot_data = SnapshotData {
270 metadata: metadata.clone(),
271 namespaces: namespace_snapshots,
272 };
273
274 let size_bytes = self.save_snapshot(&snapshot_id, &snapshot_data)?;
275
276 let mut final_metadata = metadata;
277 final_metadata.size_bytes = size_bytes;
278
279 self.save_metadata(&snapshot_id, &final_metadata)?;
280 self.cleanup_old_snapshots()?;
281
282 Ok(final_metadata)
283 }
284
285 pub async fn restore_snapshot<S: VectorStorage>(
287 &self,
288 storage: &S,
289 snapshot_id: &str,
290 ) -> Result<RestoreResult> {
291 let snapshot_data = self.load_snapshot(snapshot_id)?;
292
293 let mut namespaces_restored = 0;
294 let mut vectors_restored = 0u64;
295
296 if snapshot_data.metadata.snapshot_type == SnapshotType::Incremental {
298 if let Some(parent_id) = &snapshot_data.metadata.parent_id {
299 let parent_result = Box::pin(self.restore_snapshot(storage, parent_id)).await?;
301 namespaces_restored += parent_result.namespaces_restored;
302 vectors_restored += parent_result.vectors_restored;
303 }
304 }
305
306 for ns_snapshot in &snapshot_data.namespaces {
308 storage.ensure_namespace(&ns_snapshot.namespace).await?;
309
310 let vectors: Vec<Vector> = ns_snapshot
311 .vectors
312 .iter()
313 .cloned()
314 .map(Vector::from)
315 .collect();
316
317 storage.upsert(&ns_snapshot.namespace, vectors).await?;
318
319 namespaces_restored += 1;
320 vectors_restored += ns_snapshot.vector_count as u64;
321 }
322
323 Ok(RestoreResult {
324 snapshot_id: snapshot_id.to_string(),
325 namespaces_restored,
326 vectors_restored,
327 })
328 }
329
330 pub fn list_snapshots(&self) -> Result<Vec<SnapshotMetadata>> {
332 let mut snapshots = Vec::new();
333
334 if let Ok(entries) = fs::read_dir(&self.config.snapshot_dir) {
335 for entry in entries.flatten() {
336 let path = entry.path();
337 if path.extension().map(|e| e == "meta").unwrap_or(false) {
338 if let Ok(metadata) = self.load_metadata_from_path(&path) {
339 snapshots.push(metadata);
340 }
341 }
342 }
343 }
344
345 snapshots.sort_by(|a, b| b.created_at.cmp(&a.created_at));
347
348 Ok(snapshots)
349 }
350
351 pub fn get_snapshot_metadata(&self, snapshot_id: &str) -> Result<SnapshotMetadata> {
353 let meta_path = self.metadata_path(snapshot_id);
354 self.load_metadata_from_path(&meta_path)
355 }
356
357 pub fn delete_snapshot(&self, snapshot_id: &str) -> Result<bool> {
359 let snapshot_path = self.snapshot_path(snapshot_id);
360 let meta_path = self.metadata_path(snapshot_id);
361
362 let mut deleted = false;
363
364 if snapshot_path.exists() {
365 if let Err(e) = fs::remove_file(&snapshot_path) {
366 tracing::warn!(
367 path = %snapshot_path.display(),
368 error = %e,
369 "Failed to remove snapshot file"
370 );
371 } else {
372 deleted = true;
373 }
374 }
375
376 if meta_path.exists() {
377 if let Err(e) = fs::remove_file(&meta_path) {
378 tracing::warn!(
379 path = %meta_path.display(),
380 error = %e,
381 "Failed to remove snapshot metadata file"
382 );
383 } else {
384 deleted = true;
385 }
386 }
387
388 Ok(deleted)
389 }
390
391 pub fn snapshot_exists(&self, snapshot_id: &str) -> bool {
393 self.snapshot_path(snapshot_id).exists()
394 }
395
396 fn generate_snapshot_id(&self) -> String {
399 let timestamp = SystemTime::now()
400 .duration_since(UNIX_EPOCH)
401 .expect("system clock is before UNIX epoch")
402 .as_millis();
403 let counter = SNAPSHOT_COUNTER.fetch_add(1, Ordering::Relaxed);
404 format!("snap_{}_{}", timestamp, counter)
405 }
406
407 fn snapshot_path(&self, snapshot_id: &str) -> PathBuf {
408 self.config
409 .snapshot_dir
410 .join(format!("{}.snap", snapshot_id))
411 }
412
413 fn metadata_path(&self, snapshot_id: &str) -> PathBuf {
414 self.config
415 .snapshot_dir
416 .join(format!("{}.meta", snapshot_id))
417 }
418
419 fn save_snapshot(&self, snapshot_id: &str, data: &SnapshotData) -> Result<u64> {
420 let path = self.snapshot_path(snapshot_id);
421 let file = File::create(&path)
422 .map_err(|e| DakeraError::Storage(format!("Failed to create snapshot: {}", e)))?;
423
424 let writer = BufWriter::new(file);
425
426 if self.config.compression_enabled {
427 serde_json::to_writer(writer, data)
429 .map_err(|e| DakeraError::Storage(format!("Snapshot serialize error: {}", e)))?;
430 } else {
431 serde_json::to_writer_pretty(writer, data)
432 .map_err(|e| DakeraError::Storage(format!("Snapshot serialize error: {}", e)))?;
433 }
434
435 let metadata = fs::metadata(&path)
437 .map_err(|e| DakeraError::Storage(format!("Failed to get snapshot size: {}", e)))?;
438
439 Ok(metadata.len())
440 }
441
442 fn load_snapshot(&self, snapshot_id: &str) -> Result<SnapshotData> {
443 let path = self.snapshot_path(snapshot_id);
444 let file = File::open(&path)
445 .map_err(|e| DakeraError::Storage(format!("Failed to open snapshot: {}", e)))?;
446
447 let reader = BufReader::new(file);
448
449 serde_json::from_reader(reader)
450 .map_err(|e| DakeraError::Storage(format!("Snapshot deserialize error: {}", e)))
451 }
452
453 fn save_metadata(&self, snapshot_id: &str, metadata: &SnapshotMetadata) -> Result<()> {
454 let path = self.metadata_path(snapshot_id);
455 let file = File::create(&path)
456 .map_err(|e| DakeraError::Storage(format!("Failed to create metadata: {}", e)))?;
457
458 let writer = BufWriter::new(file);
459
460 serde_json::to_writer_pretty(writer, metadata)
461 .map_err(|e| DakeraError::Storage(format!("Metadata serialize error: {}", e)))?;
462
463 Ok(())
464 }
465
466 fn load_metadata_from_path(&self, path: &Path) -> Result<SnapshotMetadata> {
467 let file = File::open(path)
468 .map_err(|e| DakeraError::Storage(format!("Failed to open metadata: {}", e)))?;
469
470 let reader = BufReader::new(file);
471
472 serde_json::from_reader(reader)
473 .map_err(|e| DakeraError::Storage(format!("Metadata deserialize error: {}", e)))
474 }
475
476 fn cleanup_old_snapshots(&self) -> Result<()> {
477 let mut snapshots = self.list_snapshots()?;
478
479 if snapshots.len() > self.config.max_snapshots {
481 let to_remove = snapshots.split_off(self.config.max_snapshots);
483
484 let mut deleted_ids = std::collections::HashSet::new();
486
487 for snapshot in &to_remove {
488 let is_parent_of_kept = snapshots
490 .iter()
491 .any(|s| s.parent_id.as_ref() == Some(&snapshot.id));
492 let is_parent_of_remaining = to_remove.iter().any(|s| {
494 s.parent_id.as_ref() == Some(&snapshot.id) && !deleted_ids.contains(&s.id)
495 });
496
497 if !is_parent_of_kept && !is_parent_of_remaining {
498 self.delete_snapshot(&snapshot.id)?;
499 deleted_ids.insert(snapshot.id.clone());
500 }
501 }
502 }
503
504 Ok(())
505 }
506}
507
508#[derive(Debug, Clone)]
510pub struct RestoreResult {
511 pub snapshot_id: String,
513 pub namespaces_restored: usize,
515 pub vectors_restored: u64,
517}
518
519#[cfg(test)]
520mod tests {
521 use super::*;
522 use crate::memory::InMemoryStorage;
523 use tempfile::TempDir;
524
525 fn test_config(dir: &Path) -> SnapshotConfig {
526 SnapshotConfig {
527 snapshot_dir: dir.to_path_buf(),
528 max_snapshots: 5,
529 compression_enabled: false,
530 include_metadata: true,
531 }
532 }
533
534 fn create_test_vector(id: &str, dim: usize) -> Vector {
535 Vector {
536 id: id.to_string(),
537 values: vec![1.0; dim],
538 metadata: None,
539 ttl_seconds: None,
540 expires_at: None,
541 }
542 }
543
544 #[tokio::test]
545 async fn test_create_snapshot() {
546 let temp_dir = TempDir::new().unwrap();
547 let config = test_config(temp_dir.path());
548 let manager = SnapshotManager::new(config).unwrap();
549
550 let storage = InMemoryStorage::new();
551
552 storage.ensure_namespace(&"test".to_string()).await.unwrap();
554 storage
555 .upsert(
556 &"test".to_string(),
557 vec![create_test_vector("v1", 4), create_test_vector("v2", 4)],
558 )
559 .await
560 .unwrap();
561
562 let metadata = manager
564 .create_snapshot(&storage, Some("Test snapshot".to_string()))
565 .await
566 .unwrap();
567
568 assert_eq!(metadata.total_vectors, 2);
569 assert_eq!(metadata.namespaces.len(), 1);
570 assert_eq!(metadata.snapshot_type, SnapshotType::Full);
571 }
572
573 #[tokio::test]
574 async fn test_restore_snapshot() {
575 let temp_dir = TempDir::new().unwrap();
576 let config = test_config(temp_dir.path());
577 let manager = SnapshotManager::new(config).unwrap();
578
579 let storage = InMemoryStorage::new();
580
581 storage.ensure_namespace(&"test".to_string()).await.unwrap();
583 storage
584 .upsert(&"test".to_string(), vec![create_test_vector("v1", 4)])
585 .await
586 .unwrap();
587
588 let metadata = manager.create_snapshot(&storage, None).await.unwrap();
589
590 storage
592 .delete(&"test".to_string(), &["v1".to_string()])
593 .await
594 .unwrap();
595 assert_eq!(storage.count(&"test".to_string()).await.unwrap(), 0);
596
597 let result = manager
599 .restore_snapshot(&storage, &metadata.id)
600 .await
601 .unwrap();
602
603 assert_eq!(result.vectors_restored, 1);
604 assert_eq!(storage.count(&"test".to_string()).await.unwrap(), 1);
605 }
606
607 #[tokio::test]
608 async fn test_list_snapshots() {
609 let temp_dir = TempDir::new().unwrap();
610 let config = test_config(temp_dir.path());
611 let manager = SnapshotManager::new(config).unwrap();
612
613 let storage = InMemoryStorage::new();
614 storage.ensure_namespace(&"test".to_string()).await.unwrap();
615 storage
616 .upsert(&"test".to_string(), vec![create_test_vector("v1", 4)])
617 .await
618 .unwrap();
619
620 for i in 0..3 {
622 manager
623 .create_snapshot(&storage, Some(format!("Snapshot {}", i)))
624 .await
625 .unwrap();
626 }
627
628 let snapshots = manager.list_snapshots().unwrap();
629 assert_eq!(snapshots.len(), 3);
630
631 assert!(snapshots[0].created_at >= snapshots[1].created_at);
633 }
634
635 #[tokio::test]
636 async fn test_delete_snapshot() {
637 let temp_dir = TempDir::new().unwrap();
638 let config = test_config(temp_dir.path());
639 let manager = SnapshotManager::new(config).unwrap();
640
641 let storage = InMemoryStorage::new();
642 storage.ensure_namespace(&"test".to_string()).await.unwrap();
643 storage
644 .upsert(&"test".to_string(), vec![create_test_vector("v1", 4)])
645 .await
646 .unwrap();
647
648 let metadata = manager.create_snapshot(&storage, None).await.unwrap();
649
650 assert!(manager.snapshot_exists(&metadata.id));
651
652 manager.delete_snapshot(&metadata.id).unwrap();
653
654 assert!(!manager.snapshot_exists(&metadata.id));
655 }
656
657 #[tokio::test]
658 async fn test_snapshot_cleanup() {
659 let temp_dir = TempDir::new().unwrap();
660 let mut config = test_config(temp_dir.path());
661 config.max_snapshots = 3;
662 let manager = SnapshotManager::new(config).unwrap();
663
664 let storage = InMemoryStorage::new();
665 storage.ensure_namespace(&"test".to_string()).await.unwrap();
666 storage
667 .upsert(&"test".to_string(), vec![create_test_vector("v1", 4)])
668 .await
669 .unwrap();
670
671 for _ in 0..5 {
673 manager.create_snapshot(&storage, None).await.unwrap();
674 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
676 }
677
678 let snapshots = manager.list_snapshots().unwrap();
679 assert!(snapshots.len() <= 3);
680 }
681}