1use anyhow::{Result, anyhow};
3use serde::{Deserialize, Serialize};
4use std::fs::{self, File};
5use std::io::{Read, Write, BufReader, BufWriter, Cursor};
6use std::path::{Path, PathBuf};
7use flate2::read::GzDecoder;
8use flate2::write::GzEncoder;
9use flate2::Compression;
10use chrono::{DateTime, Utc};
11use sha2::{Sha256, Digest};
12use tar::Archive;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct SnapshotDescription {
17 pub name: String,
18 pub creation_time: Option<String>,
19 pub size: u64,
20 #[serde(skip_serializing_if = "Option::is_none")]
21 pub checksum: Option<String>,
22}
23
24#[derive(Debug, Serialize, Deserialize)]
26pub struct CollectionSnapshotData {
27 pub name: String,
28 pub config: CollectionConfigData,
29 pub points: Vec<PointData>,
30 pub created_at: u64,
31}
32
33#[derive(Debug, Serialize, Deserialize)]
34pub struct CollectionConfigData {
35 pub vector_dim: usize,
36 pub distance: String,
37 pub use_hnsw: bool,
38 pub enable_bm25: bool,
39}
40
41#[derive(Debug, Serialize, Deserialize)]
42pub struct PointData {
43 pub id: String,
44 pub vector: Vec<f32>,
45 #[serde(skip_serializing_if = "Option::is_none")]
46 pub multivector: Option<Vec<Vec<f32>>>,
47 #[serde(skip_serializing_if = "Option::is_none")]
48 pub payload: Option<serde_json::Value>,
49}
50
51pub struct SnapshotManager {
52 snapshot_dir: PathBuf,
53}
54
55impl SnapshotManager {
56 pub fn new<P: AsRef<Path>>(snapshot_dir: P) -> Result<Self> {
57 let snapshot_dir = snapshot_dir.as_ref().to_path_buf();
58 fs::create_dir_all(&snapshot_dir)?;
59 Ok(Self { snapshot_dir })
60 }
61
62 fn collection_snapshot_dir(&self, collection_name: &str) -> PathBuf {
64 self.snapshot_dir.join(collection_name)
65 }
66
67 fn generate_snapshot_name(collection_name: &str) -> String {
69 let now: DateTime<Utc> = Utc::now();
70 format!(
71 "{}-{}.snapshot",
72 collection_name,
73 now.format("%Y-%m-%d-%H-%M-%S")
74 )
75 }
76
77 pub fn create_collection_snapshot(&self, data: CollectionSnapshotData) -> Result<SnapshotDescription> {
79 let collection_dir = self.collection_snapshot_dir(&data.name);
80 fs::create_dir_all(&collection_dir)?;
81
82 let snapshot_name = Self::generate_snapshot_name(&data.name);
83 let snapshot_path = collection_dir.join(&snapshot_name);
84
85 let json_data = serde_json::to_vec(&data)?;
87
88 let file = File::create(&snapshot_path)?;
89 let mut encoder = GzEncoder::new(BufWriter::new(file), Compression::default());
90 encoder.write_all(&json_data)?;
91 encoder.finish()?;
92
93 let file_data = fs::read(&snapshot_path)?;
95 let checksum = format!("{:x}", Sha256::digest(&file_data));
96
97 let metadata = fs::metadata(&snapshot_path)?;
98 let creation_time = metadata.created()
99 .ok()
100 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
101 .map(|d| {
102 DateTime::from_timestamp(d.as_secs() as i64, 0)
103 .map(|dt| dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
104 })
105 .flatten();
106
107 Ok(SnapshotDescription {
108 name: snapshot_name,
109 creation_time,
110 size: metadata.len(),
111 checksum: Some(checksum),
112 })
113 }
114
115 pub fn list_collection_snapshots(&self, collection_name: &str) -> Result<Vec<SnapshotDescription>> {
117 let collection_dir = self.collection_snapshot_dir(collection_name);
118
119 if !collection_dir.exists() {
120 return Ok(Vec::new());
121 }
122
123 let mut snapshots = Vec::new();
124 for entry in fs::read_dir(&collection_dir)? {
125 let entry = entry?;
126 let path = entry.path();
127
128 if path.extension().and_then(|s| s.to_str()) == Some("snapshot") {
129 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
130 let metadata = fs::metadata(&path)?;
131
132 let file_data = fs::read(&path)?;
134 let checksum = format!("{:x}", Sha256::digest(&file_data));
135
136 let creation_time = metadata.created()
137 .ok()
138 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
139 .map(|d| {
140 DateTime::from_timestamp(d.as_secs() as i64, 0)
141 .map(|dt| dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
142 })
143 .flatten();
144
145 snapshots.push(SnapshotDescription {
146 name: name.to_string(),
147 creation_time,
148 size: metadata.len(),
149 checksum: Some(checksum),
150 });
151 }
152 }
153 }
154
155 snapshots.sort_by(|a, b| b.name.cmp(&a.name));
157 Ok(snapshots)
158 }
159
160 pub fn load_collection_snapshot(&self, collection_name: &str, snapshot_name: &str) -> Result<CollectionSnapshotData> {
162 let snapshot_path = self.collection_snapshot_dir(collection_name).join(snapshot_name);
163
164 if !snapshot_path.exists() {
165 return Err(anyhow!("Snapshot '{}' not found for collection '{}'", snapshot_name, collection_name));
166 }
167
168 let file = File::open(&snapshot_path)?;
169 let mut decoder = GzDecoder::new(BufReader::new(file));
170 let mut json_data = Vec::new();
171 decoder.read_to_end(&mut json_data)?;
172
173 let data: CollectionSnapshotData = serde_json::from_slice(&json_data)?;
174 Ok(data)
175 }
176
177 pub fn delete_collection_snapshot(&self, collection_name: &str, snapshot_name: &str) -> Result<bool> {
179 let snapshot_path = self.collection_snapshot_dir(collection_name).join(snapshot_name);
180
181 if snapshot_path.exists() {
182 fs::remove_file(&snapshot_path)?;
183 Ok(true)
184 } else {
185 Ok(false)
186 }
187 }
188
189 pub fn get_snapshot_path(&self, collection_name: &str, snapshot_name: &str) -> Option<PathBuf> {
191 let path = self.collection_snapshot_dir(collection_name).join(snapshot_name);
192 if path.exists() {
193 Some(path)
194 } else {
195 None
196 }
197 }
198
199 pub async fn download_snapshot_from_url(
202 &self,
203 collection_name: &str,
204 url: &str,
205 expected_checksum: Option<&str>,
206 ) -> Result<PathBuf> {
207 let collection_dir = self.collection_snapshot_dir(collection_name);
208 fs::create_dir_all(&collection_dir)?;
209
210 let filename = url
212 .rsplit('/')
213 .next()
214 .filter(|s| s.ends_with(".snapshot"))
215 .map(|s| s.to_string())
216 .unwrap_or_else(|| Self::generate_snapshot_name(collection_name));
217
218 let snapshot_path = collection_dir.join(&filename);
219
220 let response = reqwest::get(url).await
222 .map_err(|e| anyhow!("Failed to download snapshot: {}", e))?;
223
224 if !response.status().is_success() {
225 return Err(anyhow!("Failed to download snapshot: HTTP {}", response.status()));
226 }
227
228 let bytes = response.bytes().await
229 .map_err(|e| anyhow!("Failed to read snapshot data: {}", e))?;
230
231 if let Some(expected) = expected_checksum {
233 let actual = format!("{:x}", Sha256::digest(&bytes));
234 if actual != expected {
235 return Err(anyhow!(
236 "Checksum mismatch: expected {}, got {}",
237 expected,
238 actual
239 ));
240 }
241 }
242
243 fs::write(&snapshot_path, &bytes)?;
245
246 Ok(snapshot_path)
247 }
248
249 pub fn load_snapshot_from_path(&self, path: &Path) -> Result<CollectionSnapshotData> {
252 let file_data = fs::read(path)?;
253
254 let data = if file_data.len() > 2 && file_data[0] == 0x1f && file_data[1] == 0x8b {
256 let mut decoder = GzDecoder::new(Cursor::new(&file_data));
258 let mut decompressed = Vec::new();
259 decoder.read_to_end(&mut decompressed)?;
260 decompressed
261 } else {
262 file_data
264 };
265
266 if let Ok(snapshot_data) = serde_json::from_slice::<CollectionSnapshotData>(&data) {
268 return Ok(snapshot_data);
269 }
270
271 if data.len() > 262 && &data[257..262] == b"ustar" {
273 return self.try_parse_qdrant_snapshot(&data);
274 }
275
276 Err(anyhow!("Failed to parse snapshot: not a valid vectX or Qdrant snapshot format"))
277 }
278
279 fn try_parse_qdrant_snapshot(&self, tar_data: &[u8]) -> Result<CollectionSnapshotData> {
281 let cursor = Cursor::new(tar_data);
282 let mut archive = Archive::new(cursor);
283
284 let mut collection_config: Option<serde_json::Value> = None;
285 let mut collection_name = String::from("imported_collection");
286
287 for entry in archive.entries()? {
289 let mut entry = entry?;
290 let path = entry.path()?.to_path_buf();
291 let path_str = path.to_string_lossy();
292
293 if path_str.ends_with("config.json") {
295 let mut content = String::new();
296 entry.read_to_string(&mut content)?;
297 if let Ok(config) = serde_json::from_str::<serde_json::Value>(&content) {
298 collection_config = Some(config);
299 }
300 }
301 }
302
303 let (vector_dim, distance) = if let Some(config) = &collection_config {
305 let params = config.get("params").unwrap_or(config);
306
307 let vectors = params.get("vectors");
309 let (dim, dist) = if let Some(v) = vectors {
310 if let Some(size) = v.get("size").and_then(|s| s.as_u64()) {
311 let distance = v.get("distance")
312 .and_then(|d| d.as_str())
313 .unwrap_or("Cosine")
314 .to_string();
315 (size as usize, distance)
316 } else {
317 if let Some(obj) = v.as_object() {
319 if let Some((_, first_vec)) = obj.iter().next() {
320 let size = first_vec.get("size").and_then(|s| s.as_u64()).unwrap_or(128) as usize;
321 let distance = first_vec.get("distance")
322 .and_then(|d| d.as_str())
323 .unwrap_or("Cosine")
324 .to_string();
325 (size, distance)
326 } else {
327 (128, "Cosine".to_string())
328 }
329 } else {
330 (128, "Cosine".to_string())
331 }
332 }
333 } else {
334 (128, "Cosine".to_string())
335 };
336
337 (dim, dist)
338 } else {
339 return Err(anyhow!(
340 "Could not find collection config in Qdrant snapshot. \
341 Note: vectX can read Qdrant snapshot structure but cannot extract points from RocksDB storage. \
342 To migrate data from Qdrant:\n\
343 1. Run Qdrant with the snapshot restored\n\
344 2. Use the scroll API to export all points\n\
345 3. Import them into vectX using the upsert API"
346 ));
347 };
348
349 eprintln!(
352 "Note: Imported Qdrant collection config ({}D vectors, {} distance). \
353 Points cannot be automatically extracted from Qdrant's RocksDB storage. \
354 Please use the Qdrant scroll API to migrate points.",
355 vector_dim, distance
356 );
357
358 Ok(CollectionSnapshotData {
359 name: collection_name,
360 config: CollectionConfigData {
361 vector_dim,
362 distance,
363 use_hnsw: true,
364 enable_bm25: false,
365 },
366 points: Vec::new(), created_at: std::time::SystemTime::now()
368 .duration_since(std::time::UNIX_EPOCH)
369 .map(|d| d.as_secs())
370 .unwrap_or(0),
371 })
372 }
373
374 pub fn save_uploaded_snapshot(
376 &self,
377 collection_name: &str,
378 data: &[u8],
379 filename: Option<&str>,
380 ) -> Result<PathBuf> {
381 let collection_dir = self.collection_snapshot_dir(collection_name);
382 fs::create_dir_all(&collection_dir)?;
383
384 let snapshot_name = filename
386 .filter(|f| f.ends_with(".snapshot"))
387 .map(|f| f.to_string())
388 .unwrap_or_else(|| Self::generate_snapshot_name(collection_name));
389
390 let snapshot_path = collection_dir.join(&snapshot_name);
391
392 fs::write(&snapshot_path, data)?;
394
395 Ok(snapshot_path)
396 }
397
398 pub fn list_all_snapshots(&self) -> Result<Vec<SnapshotDescription>> {
400 let mut all_snapshots = Vec::new();
401
402 if !self.snapshot_dir.exists() {
403 return Ok(all_snapshots);
404 }
405
406 for entry in fs::read_dir(&self.snapshot_dir)? {
407 let entry = entry?;
408 if entry.path().is_dir() {
409 if let Some(collection_name) = entry.file_name().to_str() {
410 let snapshots = self.list_collection_snapshots(collection_name)?;
411 all_snapshots.extend(snapshots);
412 }
413 }
414 }
415
416 all_snapshots.sort_by(|a, b| b.name.cmp(&a.name));
417 Ok(all_snapshots)
418 }
419}
420