vectx_storage/
snapshot.rs

1// Snapshot support for persistence with LMDB
2use anyhow::{Result, anyhow};
3use serde::{Deserialize, Serialize};
4use std::fs::{self, File};
5use std::io::{Read, Write, BufReader, BufWriter, Cursor};
6use std::path::{Path, PathBuf};
7use flate2::read::GzDecoder;
8use flate2::write::GzEncoder;
9use flate2::Compression;
10use chrono::{DateTime, Utc};
11use sha2::{Sha256, Digest};
12use tar::Archive;
13
14/// Snapshot description for API responses
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct SnapshotDescription {
17    pub name: String,
18    pub creation_time: Option<String>,
19    pub size: u64,
20    #[serde(skip_serializing_if = "Option::is_none")]
21    pub checksum: Option<String>,
22}
23
24/// Collection snapshot data - contains all points and config
25#[derive(Debug, Serialize, Deserialize)]
26pub struct CollectionSnapshotData {
27    pub name: String,
28    pub config: CollectionConfigData,
29    pub points: Vec<PointData>,
30    pub created_at: u64,
31}
32
33#[derive(Debug, Serialize, Deserialize)]
34pub struct CollectionConfigData {
35    pub vector_dim: usize,
36    pub distance: String,
37    pub use_hnsw: bool,
38    pub enable_bm25: bool,
39}
40
41#[derive(Debug, Serialize, Deserialize)]
42pub struct PointData {
43    pub id: String,
44    pub vector: Vec<f32>,
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub multivector: Option<Vec<Vec<f32>>>,
47    #[serde(skip_serializing_if = "Option::is_none")]
48    pub payload: Option<serde_json::Value>,
49}
50
51pub struct SnapshotManager {
52    snapshot_dir: PathBuf,
53}
54
55impl SnapshotManager {
56    pub fn new<P: AsRef<Path>>(snapshot_dir: P) -> Result<Self> {
57        let snapshot_dir = snapshot_dir.as_ref().to_path_buf();
58        fs::create_dir_all(&snapshot_dir)?;
59        Ok(Self { snapshot_dir })
60    }
61
62    /// Get the snapshot directory for a specific collection
63    fn collection_snapshot_dir(&self, collection_name: &str) -> PathBuf {
64        self.snapshot_dir.join(collection_name)
65    }
66
67    /// Generate snapshot filename with timestamp
68    fn generate_snapshot_name(collection_name: &str) -> String {
69        let now: DateTime<Utc> = Utc::now();
70        format!(
71            "{}-{}.snapshot",
72            collection_name,
73            now.format("%Y-%m-%d-%H-%M-%S")
74        )
75    }
76
77    /// Create a snapshot for a collection
78    pub fn create_collection_snapshot(&self, data: CollectionSnapshotData) -> Result<SnapshotDescription> {
79        let collection_dir = self.collection_snapshot_dir(&data.name);
80        fs::create_dir_all(&collection_dir)?;
81
82        let snapshot_name = Self::generate_snapshot_name(&data.name);
83        let snapshot_path = collection_dir.join(&snapshot_name);
84
85        // Serialize to JSON and compress with gzip
86        let json_data = serde_json::to_vec(&data)?;
87        
88        let file = File::create(&snapshot_path)?;
89        let mut encoder = GzEncoder::new(BufWriter::new(file), Compression::default());
90        encoder.write_all(&json_data)?;
91        encoder.finish()?;
92
93        // Calculate checksum
94        let file_data = fs::read(&snapshot_path)?;
95        let checksum = format!("{:x}", Sha256::digest(&file_data));
96
97        let metadata = fs::metadata(&snapshot_path)?;
98        let creation_time = metadata.created()
99            .ok()
100            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
101            .map(|d| {
102                DateTime::from_timestamp(d.as_secs() as i64, 0)
103                    .map(|dt| dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
104            })
105            .flatten();
106
107        Ok(SnapshotDescription {
108            name: snapshot_name,
109            creation_time,
110            size: metadata.len(),
111            checksum: Some(checksum),
112        })
113    }
114
115    /// List all snapshots for a collection
116    pub fn list_collection_snapshots(&self, collection_name: &str) -> Result<Vec<SnapshotDescription>> {
117        let collection_dir = self.collection_snapshot_dir(collection_name);
118        
119        if !collection_dir.exists() {
120            return Ok(Vec::new());
121        }
122
123        let mut snapshots = Vec::new();
124        for entry in fs::read_dir(&collection_dir)? {
125            let entry = entry?;
126            let path = entry.path();
127            
128            if path.extension().and_then(|s| s.to_str()) == Some("snapshot") {
129                if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
130                    let metadata = fs::metadata(&path)?;
131                    
132                    // Calculate checksum
133                    let file_data = fs::read(&path)?;
134                    let checksum = format!("{:x}", Sha256::digest(&file_data));
135                    
136                    let creation_time = metadata.created()
137                        .ok()
138                        .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
139                        .map(|d| {
140                            DateTime::from_timestamp(d.as_secs() as i64, 0)
141                                .map(|dt| dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
142                        })
143                        .flatten();
144
145                    snapshots.push(SnapshotDescription {
146                        name: name.to_string(),
147                        creation_time,
148                        size: metadata.len(),
149                        checksum: Some(checksum),
150                    });
151                }
152            }
153        }
154
155        // Sort by name (which includes timestamp)
156        snapshots.sort_by(|a, b| b.name.cmp(&a.name));
157        Ok(snapshots)
158    }
159
160    /// Load a snapshot from file
161    pub fn load_collection_snapshot(&self, collection_name: &str, snapshot_name: &str) -> Result<CollectionSnapshotData> {
162        let snapshot_path = self.collection_snapshot_dir(collection_name).join(snapshot_name);
163        
164        if !snapshot_path.exists() {
165            return Err(anyhow!("Snapshot '{}' not found for collection '{}'", snapshot_name, collection_name));
166        }
167
168        let file = File::open(&snapshot_path)?;
169        let mut decoder = GzDecoder::new(BufReader::new(file));
170        let mut json_data = Vec::new();
171        decoder.read_to_end(&mut json_data)?;
172
173        let data: CollectionSnapshotData = serde_json::from_slice(&json_data)?;
174        Ok(data)
175    }
176
177    /// Delete a snapshot
178    pub fn delete_collection_snapshot(&self, collection_name: &str, snapshot_name: &str) -> Result<bool> {
179        let snapshot_path = self.collection_snapshot_dir(collection_name).join(snapshot_name);
180        
181        if snapshot_path.exists() {
182            fs::remove_file(&snapshot_path)?;
183            Ok(true)
184        } else {
185            Ok(false)
186        }
187    }
188
189    /// Get snapshot file path for download
190    pub fn get_snapshot_path(&self, collection_name: &str, snapshot_name: &str) -> Option<PathBuf> {
191        let path = self.collection_snapshot_dir(collection_name).join(snapshot_name);
192        if path.exists() {
193            Some(path)
194        } else {
195            None
196        }
197    }
198
199    /// Download snapshot from URL and save it
200    /// Supports both vectX and Qdrant snapshot formats
201    pub async fn download_snapshot_from_url(
202        &self,
203        collection_name: &str,
204        url: &str,
205        expected_checksum: Option<&str>,
206    ) -> Result<PathBuf> {
207        let collection_dir = self.collection_snapshot_dir(collection_name);
208        fs::create_dir_all(&collection_dir)?;
209
210        // Extract filename from URL or generate one
211        let filename = url
212            .rsplit('/')
213            .next()
214            .filter(|s| s.ends_with(".snapshot"))
215            .map(|s| s.to_string())
216            .unwrap_or_else(|| Self::generate_snapshot_name(collection_name));
217
218        let snapshot_path = collection_dir.join(&filename);
219
220        // Download using reqwest
221        let response = reqwest::get(url).await
222            .map_err(|e| anyhow!("Failed to download snapshot: {}", e))?;
223
224        if !response.status().is_success() {
225            return Err(anyhow!("Failed to download snapshot: HTTP {}", response.status()));
226        }
227
228        let bytes = response.bytes().await
229            .map_err(|e| anyhow!("Failed to read snapshot data: {}", e))?;
230
231        // Verify checksum if provided
232        if let Some(expected) = expected_checksum {
233            let actual = format!("{:x}", Sha256::digest(&bytes));
234            if actual != expected {
235                return Err(anyhow!(
236                    "Checksum mismatch: expected {}, got {}",
237                    expected,
238                    actual
239                ));
240            }
241        }
242
243        // Save to file
244        fs::write(&snapshot_path, &bytes)?;
245
246        Ok(snapshot_path)
247    }
248
249    /// Load snapshot from a file path (for recovery)
250    /// Supports both vectX format (gzipped JSON) and Qdrant format (tar or tar.gz archive)
251    pub fn load_snapshot_from_path(&self, path: &Path) -> Result<CollectionSnapshotData> {
252        let file_data = fs::read(path)?;
253        
254        // Check if it's gzipped (magic bytes 1f 8b)
255        let data = if file_data.len() > 2 && file_data[0] == 0x1f && file_data[1] == 0x8b {
256            // Gzipped - decompress first
257            let mut decoder = GzDecoder::new(Cursor::new(&file_data));
258            let mut decompressed = Vec::new();
259            decoder.read_to_end(&mut decompressed)?;
260            decompressed
261        } else {
262            // Not gzipped - use raw data
263            file_data
264        };
265
266        // Try to parse as vectX JSON format first
267        if let Ok(snapshot_data) = serde_json::from_slice::<CollectionSnapshotData>(&data) {
268            return Ok(snapshot_data);
269        }
270
271        // Check if it's a tar archive (Qdrant format) - "ustar" at offset 257
272        if data.len() > 262 && &data[257..262] == b"ustar" {
273            return self.try_parse_qdrant_snapshot(&data);
274        }
275
276        Err(anyhow!("Failed to parse snapshot: not a valid vectX or Qdrant snapshot format"))
277    }
278
279    /// Try to parse a Qdrant tar.gz snapshot and extract collection data
280    fn try_parse_qdrant_snapshot(&self, tar_data: &[u8]) -> Result<CollectionSnapshotData> {
281        let cursor = Cursor::new(tar_data);
282        let mut archive = Archive::new(cursor);
283        
284        let mut collection_config: Option<serde_json::Value> = None;
285        let mut collection_name = String::from("imported_collection");
286        
287        // Read through the archive looking for config.json
288        for entry in archive.entries()? {
289            let mut entry = entry?;
290            let path = entry.path()?.to_path_buf();
291            let path_str = path.to_string_lossy();
292            
293            // Look for collection config
294            if path_str.ends_with("config.json") {
295                let mut content = String::new();
296                entry.read_to_string(&mut content)?;
297                if let Ok(config) = serde_json::from_str::<serde_json::Value>(&content) {
298                    collection_config = Some(config);
299                }
300            }
301        }
302
303        // Extract collection configuration
304        let (vector_dim, distance) = if let Some(config) = &collection_config {
305            let params = config.get("params").unwrap_or(config);
306            
307            // Try to get vector dimension and distance
308            let vectors = params.get("vectors");
309            let (dim, dist) = if let Some(v) = vectors {
310                if let Some(size) = v.get("size").and_then(|s| s.as_u64()) {
311                    let distance = v.get("distance")
312                        .and_then(|d| d.as_str())
313                        .unwrap_or("Cosine")
314                        .to_string();
315                    (size as usize, distance)
316                } else {
317                    // Named vectors format
318                    if let Some(obj) = v.as_object() {
319                        if let Some((_, first_vec)) = obj.iter().next() {
320                            let size = first_vec.get("size").and_then(|s| s.as_u64()).unwrap_or(128) as usize;
321                            let distance = first_vec.get("distance")
322                                .and_then(|d| d.as_str())
323                                .unwrap_or("Cosine")
324                                .to_string();
325                            (size, distance)
326                        } else {
327                            (128, "Cosine".to_string())
328                        }
329                    } else {
330                        (128, "Cosine".to_string())
331                    }
332                }
333            } else {
334                (128, "Cosine".to_string())
335            };
336
337            (dim, dist)
338        } else {
339            return Err(anyhow!(
340                "Could not find collection config in Qdrant snapshot. \
341                Note: vectX can read Qdrant snapshot structure but cannot extract points from RocksDB storage. \
342                To migrate data from Qdrant:\n\
343                1. Run Qdrant with the snapshot restored\n\
344                2. Use the scroll API to export all points\n\
345                3. Import them into vectX using the upsert API"
346            ));
347        };
348
349        // We found the config but cannot extract points from RocksDB
350        // Return an empty collection with the right config and a helpful message
351        eprintln!(
352            "Note: Imported Qdrant collection config ({}D vectors, {} distance). \
353            Points cannot be automatically extracted from Qdrant's RocksDB storage. \
354            Please use the Qdrant scroll API to migrate points.",
355            vector_dim, distance
356        );
357
358        Ok(CollectionSnapshotData {
359            name: collection_name,
360            config: CollectionConfigData {
361                vector_dim,
362                distance,
363                use_hnsw: true,
364                enable_bm25: false,
365            },
366            points: Vec::new(), // Empty - points can't be extracted from RocksDB
367            created_at: std::time::SystemTime::now()
368                .duration_since(std::time::UNIX_EPOCH)
369                .map(|d| d.as_secs())
370                .unwrap_or(0),
371        })
372    }
373
374    /// Save uploaded snapshot data to a file
375    pub fn save_uploaded_snapshot(
376        &self,
377        collection_name: &str,
378        data: &[u8],
379        filename: Option<&str>,
380    ) -> Result<PathBuf> {
381        let collection_dir = self.collection_snapshot_dir(collection_name);
382        fs::create_dir_all(&collection_dir)?;
383
384        // Use provided filename or generate one
385        let snapshot_name = filename
386            .filter(|f| f.ends_with(".snapshot"))
387            .map(|f| f.to_string())
388            .unwrap_or_else(|| Self::generate_snapshot_name(collection_name));
389
390        let snapshot_path = collection_dir.join(&snapshot_name);
391
392        // Write the data directly to the file
393        fs::write(&snapshot_path, data)?;
394
395        Ok(snapshot_path)
396    }
397
398    /// List all snapshots across all collections
399    pub fn list_all_snapshots(&self) -> Result<Vec<SnapshotDescription>> {
400        let mut all_snapshots = Vec::new();
401        
402        if !self.snapshot_dir.exists() {
403            return Ok(all_snapshots);
404        }
405
406        for entry in fs::read_dir(&self.snapshot_dir)? {
407            let entry = entry?;
408            if entry.path().is_dir() {
409                if let Some(collection_name) = entry.file_name().to_str() {
410                    let snapshots = self.list_collection_snapshots(collection_name)?;
411                    all_snapshots.extend(snapshots);
412                }
413            }
414        }
415
416        all_snapshots.sort_by(|a, b| b.name.cmp(&a.name));
417        Ok(all_snapshots)
418    }
419}
420