Skip to main content

storage/
snapshot.rs

1//! Snapshot Management for Buffer
2//!
3//! Provides point-in-time snapshots for backup and recovery:
4//! - Create consistent snapshots across namespaces
5//! - Restore from snapshots
6//! - Incremental and full snapshot support
7//! - Snapshot lifecycle management
8
9use common::{DakeraError, NamespaceId, Result, Vector};
10use serde::{Deserialize, Serialize};
11use std::fs::{self, File};
12use std::io::{BufReader, BufWriter};
13use std::path::{Path, PathBuf};
14use std::time::{SystemTime, UNIX_EPOCH};
15
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use crate::traits::VectorStorage;
19
20/// Global counter to ensure unique snapshot IDs even within the same millisecond
21static SNAPSHOT_COUNTER: AtomicU64 = AtomicU64::new(0);
22
23/// Snapshot configuration
24#[derive(Debug, Clone)]
25pub struct SnapshotConfig {
26    /// Directory for snapshot storage
27    pub snapshot_dir: PathBuf,
28    /// Maximum number of snapshots to retain
29    pub max_snapshots: usize,
30    /// Enable compression
31    pub compression_enabled: bool,
32    /// Include metadata in snapshots
33    pub include_metadata: bool,
34}
35
36impl Default for SnapshotConfig {
37    fn default() -> Self {
38        Self {
39            snapshot_dir: PathBuf::from("./data/snapshots"),
40            max_snapshots: 10,
41            compression_enabled: true,
42            include_metadata: true,
43        }
44    }
45}
46
47/// Snapshot metadata
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct SnapshotMetadata {
50    /// Unique snapshot ID
51    pub id: String,
52    /// Creation timestamp (Unix seconds)
53    pub created_at: u64,
54    /// Optional description
55    pub description: Option<String>,
56    /// Namespaces included
57    pub namespaces: Vec<String>,
58    /// Total vector count
59    pub total_vectors: u64,
60    /// Snapshot size in bytes
61    pub size_bytes: u64,
62    /// Snapshot type
63    pub snapshot_type: SnapshotType,
64    /// Parent snapshot ID (for incremental)
65    pub parent_id: Option<String>,
66    /// Version information
67    pub version: String,
68}
69
70/// Type of snapshot
71#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
72pub enum SnapshotType {
73    /// Full snapshot of all data
74    Full,
75    /// Incremental from parent
76    Incremental,
77}
78
79/// Serialized namespace data for snapshots
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct NamespaceSnapshot {
82    /// Namespace identifier
83    pub namespace: String,
84    /// Vector count
85    pub vector_count: usize,
86    /// Vector dimension
87    pub dimension: Option<usize>,
88    /// Serialized vectors
89    pub vectors: Vec<SerializedVector>,
90}
91
92/// Serialized vector for snapshot storage
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct SerializedVector {
95    pub id: String,
96    pub values: Vec<f32>,
97    pub metadata: Option<serde_json::Value>,
98}
99
100impl From<&Vector> for SerializedVector {
101    fn from(v: &Vector) -> Self {
102        Self {
103            id: v.id.clone(),
104            values: v.values.clone(),
105            metadata: v.metadata.clone(),
106        }
107    }
108}
109
110impl From<SerializedVector> for Vector {
111    fn from(sv: SerializedVector) -> Self {
112        Vector {
113            id: sv.id,
114            values: sv.values,
115            metadata: sv.metadata,
116            ttl_seconds: None,
117            expires_at: None,
118        }
119    }
120}
121
122/// Full snapshot data
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct SnapshotData {
125    /// Snapshot metadata
126    pub metadata: SnapshotMetadata,
127    /// Namespace snapshots
128    pub namespaces: Vec<NamespaceSnapshot>,
129}
130
131/// Snapshot manager
132pub struct SnapshotManager {
133    config: SnapshotConfig,
134}
135
136impl SnapshotManager {
137    /// Create a new snapshot manager
138    pub fn new(config: SnapshotConfig) -> Result<Self> {
139        // Ensure snapshot directory exists
140        fs::create_dir_all(&config.snapshot_dir)
141            .map_err(|e| DakeraError::Storage(format!("Failed to create snapshot dir: {}", e)))?;
142
143        Ok(Self { config })
144    }
145
146    /// Create a full snapshot from storage
147    pub async fn create_snapshot<S: VectorStorage>(
148        &self,
149        storage: &S,
150        description: Option<String>,
151    ) -> Result<SnapshotMetadata> {
152        let snapshot_id = self.generate_snapshot_id();
153        let created_at = SystemTime::now()
154            .duration_since(UNIX_EPOCH)
155            .expect("system clock is before UNIX epoch")
156            .as_secs();
157
158        // Get all namespaces
159        let namespaces = storage.list_namespaces().await?;
160
161        let mut namespace_snapshots = Vec::new();
162        let mut total_vectors = 0u64;
163
164        for namespace in &namespaces {
165            let vectors = storage.get_all(namespace).await?;
166            let dimension = storage.dimension(namespace).await?;
167
168            total_vectors += vectors.len() as u64;
169
170            let serialized: Vec<SerializedVector> =
171                vectors.iter().map(SerializedVector::from).collect();
172
173            namespace_snapshots.push(NamespaceSnapshot {
174                namespace: namespace.clone(),
175                vector_count: serialized.len(),
176                dimension,
177                vectors: serialized,
178            });
179        }
180
181        let metadata = SnapshotMetadata {
182            id: snapshot_id.clone(),
183            created_at,
184            description,
185            namespaces: namespaces.clone(),
186            total_vectors,
187            size_bytes: 0, // Will be updated after serialization
188            snapshot_type: SnapshotType::Full,
189            parent_id: None,
190            version: "1.0.0".to_string(),
191        };
192
193        let snapshot_data = SnapshotData {
194            metadata: metadata.clone(),
195            namespaces: namespace_snapshots,
196        };
197
198        // Save snapshot
199        let size_bytes = self.save_snapshot(&snapshot_id, &snapshot_data)?;
200
201        // Update metadata with actual size
202        let mut final_metadata = metadata;
203        final_metadata.size_bytes = size_bytes;
204
205        // Save metadata separately for quick listing
206        self.save_metadata(&snapshot_id, &final_metadata)?;
207
208        // Cleanup old snapshots
209        self.cleanup_old_snapshots()?;
210
211        Ok(final_metadata)
212    }
213
214    /// Create an incremental snapshot
215    pub async fn create_incremental_snapshot<S: VectorStorage>(
216        &self,
217        storage: &S,
218        parent_id: &str,
219        changed_namespaces: &[NamespaceId],
220        description: Option<String>,
221    ) -> Result<SnapshotMetadata> {
222        // Verify parent exists
223        if !self.snapshot_exists(parent_id) {
224            return Err(DakeraError::Storage(format!(
225                "Parent snapshot not found: {}",
226                parent_id
227            )));
228        }
229
230        let snapshot_id = self.generate_snapshot_id();
231        let created_at = SystemTime::now()
232            .duration_since(UNIX_EPOCH)
233            .expect("system clock is before UNIX epoch")
234            .as_secs();
235
236        let mut namespace_snapshots = Vec::new();
237        let mut total_vectors = 0u64;
238
239        // Only snapshot changed namespaces
240        for namespace in changed_namespaces {
241            let vectors = storage.get_all(namespace).await?;
242            let dimension = storage.dimension(namespace).await?;
243
244            total_vectors += vectors.len() as u64;
245
246            let serialized: Vec<SerializedVector> =
247                vectors.iter().map(SerializedVector::from).collect();
248
249            namespace_snapshots.push(NamespaceSnapshot {
250                namespace: namespace.clone(),
251                vector_count: serialized.len(),
252                dimension,
253                vectors: serialized,
254            });
255        }
256
257        let metadata = SnapshotMetadata {
258            id: snapshot_id.clone(),
259            created_at,
260            description,
261            namespaces: changed_namespaces.to_vec(),
262            total_vectors,
263            size_bytes: 0,
264            snapshot_type: SnapshotType::Incremental,
265            parent_id: Some(parent_id.to_string()),
266            version: "1.0.0".to_string(),
267        };
268
269        let snapshot_data = SnapshotData {
270            metadata: metadata.clone(),
271            namespaces: namespace_snapshots,
272        };
273
274        let size_bytes = self.save_snapshot(&snapshot_id, &snapshot_data)?;
275
276        let mut final_metadata = metadata;
277        final_metadata.size_bytes = size_bytes;
278
279        self.save_metadata(&snapshot_id, &final_metadata)?;
280        self.cleanup_old_snapshots()?;
281
282        Ok(final_metadata)
283    }
284
285    /// Restore from a snapshot
286    pub async fn restore_snapshot<S: VectorStorage>(
287        &self,
288        storage: &S,
289        snapshot_id: &str,
290    ) -> Result<RestoreResult> {
291        let snapshot_data = self.load_snapshot(snapshot_id)?;
292
293        let mut namespaces_restored = 0;
294        let mut vectors_restored = 0u64;
295
296        // For incremental snapshots, first restore parent chain
297        if snapshot_data.metadata.snapshot_type == SnapshotType::Incremental {
298            if let Some(parent_id) = &snapshot_data.metadata.parent_id {
299                // Recursively restore parent first
300                let parent_result = Box::pin(self.restore_snapshot(storage, parent_id)).await?;
301                namespaces_restored += parent_result.namespaces_restored;
302                vectors_restored += parent_result.vectors_restored;
303            }
304        }
305
306        // Restore this snapshot's data
307        for ns_snapshot in &snapshot_data.namespaces {
308            storage.ensure_namespace(&ns_snapshot.namespace).await?;
309
310            let vectors: Vec<Vector> = ns_snapshot
311                .vectors
312                .iter()
313                .cloned()
314                .map(Vector::from)
315                .collect();
316
317            storage.upsert(&ns_snapshot.namespace, vectors).await?;
318
319            namespaces_restored += 1;
320            vectors_restored += ns_snapshot.vector_count as u64;
321        }
322
323        Ok(RestoreResult {
324            snapshot_id: snapshot_id.to_string(),
325            namespaces_restored,
326            vectors_restored,
327        })
328    }
329
330    /// List all available snapshots
331    pub fn list_snapshots(&self) -> Result<Vec<SnapshotMetadata>> {
332        let mut snapshots = Vec::new();
333
334        if let Ok(entries) = fs::read_dir(&self.config.snapshot_dir) {
335            for entry in entries.flatten() {
336                let path = entry.path();
337                if path.extension().map(|e| e == "meta").unwrap_or(false) {
338                    if let Ok(metadata) = self.load_metadata_from_path(&path) {
339                        snapshots.push(metadata);
340                    }
341                }
342            }
343        }
344
345        // Sort by creation time (newest first)
346        snapshots.sort_by(|a, b| b.created_at.cmp(&a.created_at));
347
348        Ok(snapshots)
349    }
350
351    /// Get snapshot metadata
352    pub fn get_snapshot_metadata(&self, snapshot_id: &str) -> Result<SnapshotMetadata> {
353        let meta_path = self.metadata_path(snapshot_id);
354        self.load_metadata_from_path(&meta_path)
355    }
356
357    /// Delete a snapshot
358    pub fn delete_snapshot(&self, snapshot_id: &str) -> Result<bool> {
359        let snapshot_path = self.snapshot_path(snapshot_id);
360        let meta_path = self.metadata_path(snapshot_id);
361
362        let mut deleted = false;
363
364        if snapshot_path.exists() {
365            if let Err(e) = fs::remove_file(&snapshot_path) {
366                tracing::warn!(
367                    path = %snapshot_path.display(),
368                    error = %e,
369                    "Failed to remove snapshot file"
370                );
371            } else {
372                deleted = true;
373            }
374        }
375
376        if meta_path.exists() {
377            if let Err(e) = fs::remove_file(&meta_path) {
378                tracing::warn!(
379                    path = %meta_path.display(),
380                    error = %e,
381                    "Failed to remove snapshot metadata file"
382                );
383            } else {
384                deleted = true;
385            }
386        }
387
388        Ok(deleted)
389    }
390
391    /// Check if snapshot exists
392    pub fn snapshot_exists(&self, snapshot_id: &str) -> bool {
393        self.snapshot_path(snapshot_id).exists()
394    }
395
396    // Private methods
397
398    fn generate_snapshot_id(&self) -> String {
399        let timestamp = SystemTime::now()
400            .duration_since(UNIX_EPOCH)
401            .expect("system clock is before UNIX epoch")
402            .as_millis();
403        let counter = SNAPSHOT_COUNTER.fetch_add(1, Ordering::Relaxed);
404        format!("snap_{}_{}", timestamp, counter)
405    }
406
407    fn snapshot_path(&self, snapshot_id: &str) -> PathBuf {
408        self.config
409            .snapshot_dir
410            .join(format!("{}.snap", snapshot_id))
411    }
412
413    fn metadata_path(&self, snapshot_id: &str) -> PathBuf {
414        self.config
415            .snapshot_dir
416            .join(format!("{}.meta", snapshot_id))
417    }
418
419    fn save_snapshot(&self, snapshot_id: &str, data: &SnapshotData) -> Result<u64> {
420        let path = self.snapshot_path(snapshot_id);
421        let file = File::create(&path)
422            .map_err(|e| DakeraError::Storage(format!("Failed to create snapshot: {}", e)))?;
423
424        let writer = BufWriter::new(file);
425
426        if self.config.compression_enabled {
427            // Use simple JSON compression via serde_json's compact format
428            serde_json::to_writer(writer, data)
429                .map_err(|e| DakeraError::Storage(format!("Snapshot serialize error: {}", e)))?;
430        } else {
431            serde_json::to_writer_pretty(writer, data)
432                .map_err(|e| DakeraError::Storage(format!("Snapshot serialize error: {}", e)))?;
433        }
434
435        // Get file size
436        let metadata = fs::metadata(&path)
437            .map_err(|e| DakeraError::Storage(format!("Failed to get snapshot size: {}", e)))?;
438
439        Ok(metadata.len())
440    }
441
442    fn load_snapshot(&self, snapshot_id: &str) -> Result<SnapshotData> {
443        let path = self.snapshot_path(snapshot_id);
444        let file = File::open(&path)
445            .map_err(|e| DakeraError::Storage(format!("Failed to open snapshot: {}", e)))?;
446
447        let reader = BufReader::new(file);
448
449        serde_json::from_reader(reader)
450            .map_err(|e| DakeraError::Storage(format!("Snapshot deserialize error: {}", e)))
451    }
452
453    fn save_metadata(&self, snapshot_id: &str, metadata: &SnapshotMetadata) -> Result<()> {
454        let path = self.metadata_path(snapshot_id);
455        let file = File::create(&path)
456            .map_err(|e| DakeraError::Storage(format!("Failed to create metadata: {}", e)))?;
457
458        let writer = BufWriter::new(file);
459
460        serde_json::to_writer_pretty(writer, metadata)
461            .map_err(|e| DakeraError::Storage(format!("Metadata serialize error: {}", e)))?;
462
463        Ok(())
464    }
465
466    fn load_metadata_from_path(&self, path: &Path) -> Result<SnapshotMetadata> {
467        let file = File::open(path)
468            .map_err(|e| DakeraError::Storage(format!("Failed to open metadata: {}", e)))?;
469
470        let reader = BufReader::new(file);
471
472        serde_json::from_reader(reader)
473            .map_err(|e| DakeraError::Storage(format!("Metadata deserialize error: {}", e)))
474    }
475
476    fn cleanup_old_snapshots(&self) -> Result<()> {
477        let mut snapshots = self.list_snapshots()?;
478
479        // Keep only max_snapshots
480        if snapshots.len() > self.config.max_snapshots {
481            // Snapshots are sorted newest first, so remove from the end
482            let to_remove = snapshots.split_off(self.config.max_snapshots);
483
484            // Collect IDs we actually delete so we can check full parent chains
485            let mut deleted_ids = std::collections::HashSet::new();
486
487            for snapshot in &to_remove {
488                // Don't delete if it's a parent of any kept snapshot
489                let is_parent_of_kept = snapshots
490                    .iter()
491                    .any(|s| s.parent_id.as_ref() == Some(&snapshot.id));
492                // Don't delete if it's a parent of any to-remove snapshot that we're NOT deleting
493                let is_parent_of_remaining = to_remove.iter().any(|s| {
494                    s.parent_id.as_ref() == Some(&snapshot.id) && !deleted_ids.contains(&s.id)
495                });
496
497                if !is_parent_of_kept && !is_parent_of_remaining {
498                    self.delete_snapshot(&snapshot.id)?;
499                    deleted_ids.insert(snapshot.id.clone());
500                }
501            }
502        }
503
504        Ok(())
505    }
506}
507
508/// Result of a restore operation
509#[derive(Debug, Clone)]
510pub struct RestoreResult {
511    /// Restored snapshot ID
512    pub snapshot_id: String,
513    /// Number of namespaces restored
514    pub namespaces_restored: usize,
515    /// Number of vectors restored
516    pub vectors_restored: u64,
517}
518
519#[cfg(test)]
520mod tests {
521    use super::*;
522    use crate::memory::InMemoryStorage;
523    use tempfile::TempDir;
524
525    fn test_config(dir: &Path) -> SnapshotConfig {
526        SnapshotConfig {
527            snapshot_dir: dir.to_path_buf(),
528            max_snapshots: 5,
529            compression_enabled: false,
530            include_metadata: true,
531        }
532    }
533
534    fn create_test_vector(id: &str, dim: usize) -> Vector {
535        Vector {
536            id: id.to_string(),
537            values: vec![1.0; dim],
538            metadata: None,
539            ttl_seconds: None,
540            expires_at: None,
541        }
542    }
543
544    #[tokio::test]
545    async fn test_create_snapshot() {
546        let temp_dir = TempDir::new().unwrap();
547        let config = test_config(temp_dir.path());
548        let manager = SnapshotManager::new(config).unwrap();
549
550        let storage = InMemoryStorage::new();
551
552        // Add some data
553        storage.ensure_namespace(&"test".to_string()).await.unwrap();
554        storage
555            .upsert(
556                &"test".to_string(),
557                vec![create_test_vector("v1", 4), create_test_vector("v2", 4)],
558            )
559            .await
560            .unwrap();
561
562        // Create snapshot
563        let metadata = manager
564            .create_snapshot(&storage, Some("Test snapshot".to_string()))
565            .await
566            .unwrap();
567
568        assert_eq!(metadata.total_vectors, 2);
569        assert_eq!(metadata.namespaces.len(), 1);
570        assert_eq!(metadata.snapshot_type, SnapshotType::Full);
571    }
572
573    #[tokio::test]
574    async fn test_restore_snapshot() {
575        let temp_dir = TempDir::new().unwrap();
576        let config = test_config(temp_dir.path());
577        let manager = SnapshotManager::new(config).unwrap();
578
579        let storage = InMemoryStorage::new();
580
581        // Add data and create snapshot
582        storage.ensure_namespace(&"test".to_string()).await.unwrap();
583        storage
584            .upsert(&"test".to_string(), vec![create_test_vector("v1", 4)])
585            .await
586            .unwrap();
587
588        let metadata = manager.create_snapshot(&storage, None).await.unwrap();
589
590        // Clear storage
591        storage
592            .delete(&"test".to_string(), &["v1".to_string()])
593            .await
594            .unwrap();
595        assert_eq!(storage.count(&"test".to_string()).await.unwrap(), 0);
596
597        // Restore
598        let result = manager
599            .restore_snapshot(&storage, &metadata.id)
600            .await
601            .unwrap();
602
603        assert_eq!(result.vectors_restored, 1);
604        assert_eq!(storage.count(&"test".to_string()).await.unwrap(), 1);
605    }
606
607    #[tokio::test]
608    async fn test_list_snapshots() {
609        let temp_dir = TempDir::new().unwrap();
610        let config = test_config(temp_dir.path());
611        let manager = SnapshotManager::new(config).unwrap();
612
613        let storage = InMemoryStorage::new();
614        storage.ensure_namespace(&"test".to_string()).await.unwrap();
615        storage
616            .upsert(&"test".to_string(), vec![create_test_vector("v1", 4)])
617            .await
618            .unwrap();
619
620        // Create multiple snapshots
621        for i in 0..3 {
622            manager
623                .create_snapshot(&storage, Some(format!("Snapshot {}", i)))
624                .await
625                .unwrap();
626        }
627
628        let snapshots = manager.list_snapshots().unwrap();
629        assert_eq!(snapshots.len(), 3);
630
631        // Should be sorted newest first
632        assert!(snapshots[0].created_at >= snapshots[1].created_at);
633    }
634
635    #[tokio::test]
636    async fn test_delete_snapshot() {
637        let temp_dir = TempDir::new().unwrap();
638        let config = test_config(temp_dir.path());
639        let manager = SnapshotManager::new(config).unwrap();
640
641        let storage = InMemoryStorage::new();
642        storage.ensure_namespace(&"test".to_string()).await.unwrap();
643        storage
644            .upsert(&"test".to_string(), vec![create_test_vector("v1", 4)])
645            .await
646            .unwrap();
647
648        let metadata = manager.create_snapshot(&storage, None).await.unwrap();
649
650        assert!(manager.snapshot_exists(&metadata.id));
651
652        manager.delete_snapshot(&metadata.id).unwrap();
653
654        assert!(!manager.snapshot_exists(&metadata.id));
655    }
656
657    #[tokio::test]
658    async fn test_snapshot_cleanup() {
659        let temp_dir = TempDir::new().unwrap();
660        let mut config = test_config(temp_dir.path());
661        config.max_snapshots = 3;
662        let manager = SnapshotManager::new(config).unwrap();
663
664        let storage = InMemoryStorage::new();
665        storage.ensure_namespace(&"test".to_string()).await.unwrap();
666        storage
667            .upsert(&"test".to_string(), vec![create_test_vector("v1", 4)])
668            .await
669            .unwrap();
670
671        // Create more than max_snapshots
672        for _ in 0..5 {
673            manager.create_snapshot(&storage, None).await.unwrap();
674            // Small delay to ensure different timestamps
675            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
676        }
677
678        let snapshots = manager.list_snapshots().unwrap();
679        assert!(snapshots.len() <= 3);
680    }
681}