1use anyhow::{anyhow, Result};
39use serde::{Deserialize, Serialize};
40use std::collections::HashMap;
41use std::path::{Path, PathBuf};
42use std::time::SystemTime;
43
44use crate::store::{Metadata, VecStore};
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct Version {
49 pub version: u64,
51 pub vector: Vec<f32>,
53 pub metadata: Metadata,
55 pub timestamp: SystemTime,
57 pub description: Option<String>,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct VersionHistory {
64 pub id: String,
66 pub versions: Vec<Version>,
68 pub current_version: u64,
70}
71
72impl VersionHistory {
73 pub fn new(id: String) -> Self {
75 Self {
76 id,
77 versions: Vec::new(),
78 current_version: 0,
79 }
80 }
81
82 pub fn add_version(
84 &mut self,
85 vector: Vec<f32>,
86 metadata: Metadata,
87 description: Option<String>,
88 ) -> u64 {
89 let version_num = self.versions.len() as u64 + 1;
90
91 self.versions.push(Version {
92 version: version_num,
93 vector,
94 metadata,
95 timestamp: SystemTime::now(),
96 description,
97 });
98
99 self.current_version = version_num;
100 version_num
101 }
102
103 pub fn get_current(&self) -> Option<&Version> {
105 self.versions
106 .iter()
107 .find(|v| v.version == self.current_version)
108 }
109
110 pub fn get_version(&self, version: u64) -> Option<&Version> {
112 self.versions.iter().find(|v| v.version == version)
113 }
114
115 pub fn rollback(&mut self, version: u64) -> Result<()> {
117 if self.get_version(version).is_some() {
118 self.current_version = version;
119 Ok(())
120 } else {
121 Err(anyhow!("Version {} not found", version))
122 }
123 }
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct Snapshot {
129 pub name: String,
131 pub timestamp: SystemTime,
133 pub description: Option<String>,
135 pub states: HashMap<String, (Vec<f32>, Metadata, u64)>, }
138
139pub struct VersionedStore {
141 store: VecStore,
143 history: HashMap<String, VersionHistory>,
145 snapshots: HashMap<String, Snapshot>,
147 path: PathBuf,
149}
150
151impl VersionedStore {
152 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
154 let path_buf = path.as_ref().to_path_buf();
155 let store = VecStore::open(&path_buf)?;
156
157 let history_path = path_buf.with_extension("history");
159 let history = if history_path.exists() {
160 let data = std::fs::read_to_string(&history_path)?;
161 serde_json::from_str(&data)?
162 } else {
163 HashMap::new()
164 };
165
166 let snapshots_path = path_buf.with_extension("snapshots");
168 let snapshots = if snapshots_path.exists() {
169 let data = std::fs::read_to_string(&snapshots_path)?;
170 serde_json::from_str(&data)?
171 } else {
172 HashMap::new()
173 };
174
175 Ok(Self {
176 store,
177 history,
178 snapshots,
179 path: path_buf,
180 })
181 }
182
183 pub fn insert(
185 &mut self,
186 id: impl Into<String>,
187 vector: Vec<f32>,
188 metadata: Metadata,
189 ) -> Result<u64> {
190 let id = id.into();
191
192 self.store
194 .upsert(id.clone(), vector.clone(), metadata.clone())?;
195
196 let mut history = VersionHistory::new(id.clone());
198 let version = history.add_version(vector, metadata, Some("Initial version".to_string()));
199
200 self.history.insert(id, history);
201 self.save_history()?;
202
203 Ok(version)
204 }
205
206 pub fn update(
208 &mut self,
209 id: &str,
210 vector: Vec<f32>,
211 metadata: Metadata,
212 description: Option<String>,
213 ) -> Result<u64> {
214 let history = self
215 .history
216 .get_mut(id)
217 .ok_or_else(|| anyhow!("Vector {} not found", id))?;
218
219 self.store
221 .upsert(id.to_string(), vector.clone(), metadata.clone())?;
222
223 let version = history.add_version(vector, metadata, description);
225 self.save_history()?;
226
227 Ok(version)
228 }
229
230 pub fn rollback(&mut self, id: &str, version: u64) -> Result<()> {
232 let history = self
233 .history
234 .get_mut(id)
235 .ok_or_else(|| anyhow!("Vector {} not found", id))?;
236
237 let target = history
239 .get_version(version)
240 .ok_or_else(|| anyhow!("Version {} not found", version))?;
241
242 self.store.upsert(
244 id.to_string(),
245 target.vector.clone(),
246 target.metadata.clone(),
247 )?;
248
249 history.rollback(version)?;
251 self.save_history()?;
252
253 Ok(())
254 }
255
256 pub fn get_history(&self, id: &str) -> Option<&VersionHistory> {
258 self.history.get(id)
259 }
260
261 pub fn get_current_version(&self, id: &str) -> Option<&Version> {
263 self.history.get(id).and_then(|h| h.get_current())
264 }
265
266 pub fn create_snapshot(
268 &mut self,
269 name: impl Into<String>,
270 description: Option<String>,
271 ) -> Result<()> {
272 let name = name.into();
273
274 let mut states = HashMap::new();
276 for (id, history) in &self.history {
277 if let Some(current) = history.get_current() {
278 states.insert(
279 id.clone(),
280 (
281 current.vector.clone(),
282 current.metadata.clone(),
283 current.version,
284 ),
285 );
286 }
287 }
288
289 let snapshot = Snapshot {
290 name: name.clone(),
291 timestamp: SystemTime::now(),
292 description,
293 states,
294 };
295
296 self.snapshots.insert(name, snapshot);
297 self.save_snapshots()?;
298
299 Ok(())
300 }
301
302 pub fn restore_snapshot(&mut self, name: &str) -> Result<()> {
304 let snapshot = self
305 .snapshots
306 .get(name)
307 .ok_or_else(|| anyhow!("Snapshot {} not found", name))?
308 .clone();
309
310 for (id, (vector, metadata, version)) in snapshot.states {
312 self.store
314 .upsert(id.clone(), vector.clone(), metadata.clone())?;
315
316 if let Some(history) = self.history.get_mut(&id) {
318 history.rollback(version)?;
319 }
320 }
321
322 self.save_history()?;
323
324 Ok(())
325 }
326
327 pub fn list_snapshots(&self) -> Vec<&Snapshot> {
329 self.snapshots.values().collect()
330 }
331
332 pub fn delete_snapshot(&mut self, name: &str) -> Result<()> {
334 self.snapshots
335 .remove(name)
336 .ok_or_else(|| anyhow!("Snapshot {} not found", name))?;
337 self.save_snapshots()?;
338 Ok(())
339 }
340
341 pub fn compare_versions(&self, id: &str, v1: u64, v2: u64) -> Result<VersionDiff> {
343 let history = self
344 .history
345 .get(id)
346 .ok_or_else(|| anyhow!("Vector {} not found", id))?;
347
348 let version1 = history
349 .get_version(v1)
350 .ok_or_else(|| anyhow!("Version {} not found", v1))?;
351 let version2 = history
352 .get_version(v2)
353 .ok_or_else(|| anyhow!("Version {} not found", v2))?;
354
355 let vector_changed = version1.vector != version2.vector;
357 let metadata_changed = serde_json::to_string(&version1.metadata)?
358 != serde_json::to_string(&version2.metadata)?;
359
360 let vector_distance = if vector_changed {
361 let dist: f32 = version1
362 .vector
363 .iter()
364 .zip(&version2.vector)
365 .map(|(a, b)| (a - b).powi(2))
366 .sum::<f32>()
367 .sqrt();
368 Some(dist)
369 } else {
370 None
371 };
372
373 Ok(VersionDiff {
374 id: id.to_string(),
375 version1: v1,
376 version2: v2,
377 vector_changed,
378 metadata_changed,
379 vector_distance,
380 })
381 }
382
383 pub fn store(&self) -> &VecStore {
385 &self.store
386 }
387
388 pub fn store_mut(&mut self) -> &mut VecStore {
390 &mut self.store
391 }
392
393 fn save_history(&self) -> Result<()> {
395 let path = self.path.with_extension("history");
396 let data = serde_json::to_string(&self.history)?;
397 std::fs::write(path, data)?;
398 Ok(())
399 }
400
401 fn save_snapshots(&self) -> Result<()> {
403 let path = self.path.with_extension("snapshots");
404 let data = serde_json::to_string(&self.snapshots)?;
405 std::fs::write(path, data)?;
406 Ok(())
407 }
408
409 pub fn total_versions(&self) -> usize {
411 self.history.values().map(|h| h.versions.len()).sum()
412 }
413
414 pub fn stats(&self) -> VersioningStats {
416 let total_vectors = self.history.len();
417 let total_versions = self.total_versions();
418 let total_snapshots = self.snapshots.len();
419
420 let avg_versions_per_vector = if total_vectors > 0 {
421 total_versions as f32 / total_vectors as f32
422 } else {
423 0.0
424 };
425
426 VersioningStats {
427 total_vectors,
428 total_versions,
429 total_snapshots,
430 avg_versions_per_vector,
431 }
432 }
433}
434
435#[derive(Debug, Clone, Serialize, Deserialize)]
437pub struct VersionDiff {
438 pub id: String,
440 pub version1: u64,
442 pub version2: u64,
444 pub vector_changed: bool,
446 pub metadata_changed: bool,
448 pub vector_distance: Option<f32>,
450}
451
452#[derive(Debug, Clone, Serialize, Deserialize)]
454pub struct VersioningStats {
455 pub total_vectors: usize,
457 pub total_versions: usize,
459 pub total_snapshots: usize,
461 pub avg_versions_per_vector: f32,
463}
464
465#[cfg(test)]
466mod tests {
467 use super::*;
468 use std::collections::HashMap;
469 use tempfile::TempDir;
470
471 fn create_metadata(value: &str) -> Metadata {
472 let mut fields = HashMap::new();
473 fields.insert("value".to_string(), serde_json::json!(value));
474 Metadata { fields }
475 }
476
477 #[test]
478 fn test_basic_versioning() -> Result<()> {
479 let temp_dir = TempDir::new()?;
480 let mut store = VersionedStore::new(temp_dir.path().join("test.db"))?;
481
482 let v1 = store.insert("doc1", vec![1.0, 2.0], create_metadata("v1"))?;
484 assert_eq!(v1, 1);
485
486 let v2 = store.update(
488 "doc1",
489 vec![1.1, 2.1],
490 create_metadata("v2"),
491 Some("Updated".to_string()),
492 )?;
493 assert_eq!(v2, 2);
494
495 let history = store.get_history("doc1").unwrap();
497 assert_eq!(history.versions.len(), 2);
498 assert_eq!(history.current_version, 2);
499
500 Ok(())
501 }
502
503 #[test]
504 fn test_rollback() -> Result<()> {
505 let temp_dir = TempDir::new()?;
506 let mut store = VersionedStore::new(temp_dir.path().join("test.db"))?;
507
508 store.insert("doc1", vec![1.0, 2.0], create_metadata("v1"))?;
509 store.update("doc1", vec![2.0, 3.0], create_metadata("v2"), None)?;
510 store.update("doc1", vec![3.0, 4.0], create_metadata("v3"), None)?;
511
512 store.rollback("doc1", 1)?;
514
515 let current = store.get_current_version("doc1").unwrap();
516 assert_eq!(current.version, 1);
517 assert_eq!(current.vector, vec![1.0, 2.0]);
518
519 Ok(())
520 }
521
522 #[test]
523 fn test_snapshots() -> Result<()> {
524 let temp_dir = TempDir::new()?;
525 let mut store = VersionedStore::new(temp_dir.path().join("test.db"))?;
526
527 store.insert("doc1", vec![1.0, 2.0], create_metadata("v1"))?;
528 store.insert("doc2", vec![3.0, 4.0], create_metadata("v1"))?;
529
530 store.create_snapshot("checkpoint1", Some("Before changes".to_string()))?;
532
533 store.update("doc1", vec![5.0, 6.0], create_metadata("v2"), None)?;
535 store.update("doc2", vec![7.0, 8.0], create_metadata("v2"), None)?;
536
537 store.restore_snapshot("checkpoint1")?;
539
540 let doc1 = store.get_current_version("doc1").unwrap();
541 assert_eq!(doc1.vector, vec![1.0, 2.0]);
542
543 Ok(())
544 }
545
546 #[test]
547 fn test_compare_versions() -> Result<()> {
548 let temp_dir = TempDir::new()?;
549 let mut store = VersionedStore::new(temp_dir.path().join("test.db"))?;
550
551 store.insert("doc1", vec![1.0, 0.0], create_metadata("v1"))?;
552 store.update("doc1", vec![0.0, 1.0], create_metadata("v2"), None)?;
553
554 let diff = store.compare_versions("doc1", 1, 2)?;
555
556 assert!(diff.vector_changed);
557 assert!(diff.vector_distance.unwrap() > 0.0);
558
559 Ok(())
560 }
561
562 #[test]
563 fn test_persistence() -> Result<()> {
564 let temp_dir = TempDir::new()?;
565 let db_path = temp_dir.path().join("test.db");
566
567 {
569 let mut store = VersionedStore::new(&db_path)?;
570 store.insert("doc1", vec![1.0, 2.0], create_metadata("v1"))?;
571 store.update("doc1", vec![2.0, 3.0], create_metadata("v2"), None)?;
572 store.create_snapshot("snap1", None)?;
573 }
574
575 {
577 let store = VersionedStore::new(&db_path)?;
578 let history = store.get_history("doc1").unwrap();
579 assert_eq!(history.versions.len(), 2);
580 assert_eq!(store.list_snapshots().len(), 1);
581 }
582
583 Ok(())
584 }
585}