use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
use crate::store::{Metadata, VecStore};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Version {
pub version: u64,
pub vector: Vec<f32>,
pub metadata: Metadata,
pub timestamp: SystemTime,
pub description: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VersionHistory {
pub id: String,
pub versions: Vec<Version>,
pub current_version: u64,
}
impl VersionHistory {
pub fn new(id: String) -> Self {
Self {
id,
versions: Vec::new(),
current_version: 0,
}
}
pub fn add_version(
&mut self,
vector: Vec<f32>,
metadata: Metadata,
description: Option<String>,
) -> u64 {
let version_num = self.versions.len() as u64 + 1;
self.versions.push(Version {
version: version_num,
vector,
metadata,
timestamp: SystemTime::now(),
description,
});
self.current_version = version_num;
version_num
}
pub fn get_current(&self) -> Option<&Version> {
self.versions
.iter()
.find(|v| v.version == self.current_version)
}
pub fn get_version(&self, version: u64) -> Option<&Version> {
self.versions.iter().find(|v| v.version == version)
}
pub fn rollback(&mut self, version: u64) -> Result<()> {
if self.get_version(version).is_some() {
self.current_version = version;
Ok(())
} else {
Err(anyhow!("Version {} not found", version))
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Snapshot {
pub name: String,
pub timestamp: SystemTime,
pub description: Option<String>,
pub states: HashMap<String, (Vec<f32>, Metadata, u64)>, }
pub struct VersionedStore {
store: VecStore,
history: HashMap<String, VersionHistory>,
snapshots: HashMap<String, Snapshot>,
path: PathBuf,
}
impl VersionedStore {
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
let path_buf = path.as_ref().to_path_buf();
let store = VecStore::open(&path_buf)?;
let history_path = path_buf.with_extension("history");
let history = if history_path.exists() {
let data = std::fs::read_to_string(&history_path)?;
serde_json::from_str(&data)?
} else {
HashMap::new()
};
let snapshots_path = path_buf.with_extension("snapshots");
let snapshots = if snapshots_path.exists() {
let data = std::fs::read_to_string(&snapshots_path)?;
serde_json::from_str(&data)?
} else {
HashMap::new()
};
Ok(Self {
store,
history,
snapshots,
path: path_buf,
})
}
pub fn insert(
&mut self,
id: impl Into<String>,
vector: Vec<f32>,
metadata: Metadata,
) -> Result<u64> {
let id = id.into();
self.store
.upsert(id.clone(), vector.clone(), metadata.clone())?;
let mut history = VersionHistory::new(id.clone());
let version = history.add_version(vector, metadata, Some("Initial version".to_string()));
self.history.insert(id, history);
self.save_history()?;
Ok(version)
}
pub fn update(
&mut self,
id: &str,
vector: Vec<f32>,
metadata: Metadata,
description: Option<String>,
) -> Result<u64> {
let history = self
.history
.get_mut(id)
.ok_or_else(|| anyhow!("Vector {} not found", id))?;
self.store
.upsert(id.to_string(), vector.clone(), metadata.clone())?;
let version = history.add_version(vector, metadata, description);
self.save_history()?;
Ok(version)
}
pub fn rollback(&mut self, id: &str, version: u64) -> Result<()> {
let history = self
.history
.get_mut(id)
.ok_or_else(|| anyhow!("Vector {} not found", id))?;
let target = history
.get_version(version)
.ok_or_else(|| anyhow!("Version {} not found", version))?;
self.store.upsert(
id.to_string(),
target.vector.clone(),
target.metadata.clone(),
)?;
history.rollback(version)?;
self.save_history()?;
Ok(())
}
pub fn get_history(&self, id: &str) -> Option<&VersionHistory> {
self.history.get(id)
}
pub fn get_current_version(&self, id: &str) -> Option<&Version> {
self.history.get(id).and_then(|h| h.get_current())
}
pub fn create_snapshot(
&mut self,
name: impl Into<String>,
description: Option<String>,
) -> Result<()> {
let name = name.into();
let mut states = HashMap::new();
for (id, history) in &self.history {
if let Some(current) = history.get_current() {
states.insert(
id.clone(),
(
current.vector.clone(),
current.metadata.clone(),
current.version,
),
);
}
}
let snapshot = Snapshot {
name: name.clone(),
timestamp: SystemTime::now(),
description,
states,
};
self.snapshots.insert(name, snapshot);
self.save_snapshots()?;
Ok(())
}
pub fn restore_snapshot(&mut self, name: &str) -> Result<()> {
let snapshot = self
.snapshots
.get(name)
.ok_or_else(|| anyhow!("Snapshot {} not found", name))?
.clone();
for (id, (vector, metadata, version)) in snapshot.states {
self.store
.upsert(id.clone(), vector.clone(), metadata.clone())?;
if let Some(history) = self.history.get_mut(&id) {
history.rollback(version)?;
}
}
self.save_history()?;
Ok(())
}
pub fn list_snapshots(&self) -> Vec<&Snapshot> {
self.snapshots.values().collect()
}
pub fn delete_snapshot(&mut self, name: &str) -> Result<()> {
self.snapshots
.remove(name)
.ok_or_else(|| anyhow!("Snapshot {} not found", name))?;
self.save_snapshots()?;
Ok(())
}
pub fn compare_versions(&self, id: &str, v1: u64, v2: u64) -> Result<VersionDiff> {
let history = self
.history
.get(id)
.ok_or_else(|| anyhow!("Vector {} not found", id))?;
let version1 = history
.get_version(v1)
.ok_or_else(|| anyhow!("Version {} not found", v1))?;
let version2 = history
.get_version(v2)
.ok_or_else(|| anyhow!("Version {} not found", v2))?;
let vector_changed = version1.vector != version2.vector;
let metadata_changed = serde_json::to_string(&version1.metadata)?
!= serde_json::to_string(&version2.metadata)?;
let vector_distance = if vector_changed {
let dist: f32 = version1
.vector
.iter()
.zip(&version2.vector)
.map(|(a, b)| (a - b).powi(2))
.sum::<f32>()
.sqrt();
Some(dist)
} else {
None
};
Ok(VersionDiff {
id: id.to_string(),
version1: v1,
version2: v2,
vector_changed,
metadata_changed,
vector_distance,
})
}
pub fn store(&self) -> &VecStore {
&self.store
}
pub fn store_mut(&mut self) -> &mut VecStore {
&mut self.store
}
fn save_history(&self) -> Result<()> {
let path = self.path.with_extension("history");
let data = serde_json::to_string(&self.history)?;
std::fs::write(path, data)?;
Ok(())
}
fn save_snapshots(&self) -> Result<()> {
let path = self.path.with_extension("snapshots");
let data = serde_json::to_string(&self.snapshots)?;
std::fs::write(path, data)?;
Ok(())
}
pub fn total_versions(&self) -> usize {
self.history.values().map(|h| h.versions.len()).sum()
}
pub fn stats(&self) -> VersioningStats {
let total_vectors = self.history.len();
let total_versions = self.total_versions();
let total_snapshots = self.snapshots.len();
let avg_versions_per_vector = if total_vectors > 0 {
total_versions as f32 / total_vectors as f32
} else {
0.0
};
VersioningStats {
total_vectors,
total_versions,
total_snapshots,
avg_versions_per_vector,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VersionDiff {
pub id: String,
pub version1: u64,
pub version2: u64,
pub vector_changed: bool,
pub metadata_changed: bool,
pub vector_distance: Option<f32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VersioningStats {
pub total_vectors: usize,
pub total_versions: usize,
pub total_snapshots: usize,
pub avg_versions_per_vector: f32,
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use tempfile::TempDir;
fn create_metadata(value: &str) -> Metadata {
let mut fields = HashMap::new();
fields.insert("value".to_string(), serde_json::json!(value));
Metadata { fields }
}
#[test]
fn test_basic_versioning() -> Result<()> {
let temp_dir = TempDir::new()?;
let mut store = VersionedStore::new(temp_dir.path().join("test.db"))?;
let v1 = store.insert("doc1", vec![1.0, 2.0], create_metadata("v1"))?;
assert_eq!(v1, 1);
let v2 = store.update(
"doc1",
vec![1.1, 2.1],
create_metadata("v2"),
Some("Updated".to_string()),
)?;
assert_eq!(v2, 2);
let history = store.get_history("doc1").unwrap();
assert_eq!(history.versions.len(), 2);
assert_eq!(history.current_version, 2);
Ok(())
}
#[test]
fn test_rollback() -> Result<()> {
let temp_dir = TempDir::new()?;
let mut store = VersionedStore::new(temp_dir.path().join("test.db"))?;
store.insert("doc1", vec![1.0, 2.0], create_metadata("v1"))?;
store.update("doc1", vec![2.0, 3.0], create_metadata("v2"), None)?;
store.update("doc1", vec![3.0, 4.0], create_metadata("v3"), None)?;
store.rollback("doc1", 1)?;
let current = store.get_current_version("doc1").unwrap();
assert_eq!(current.version, 1);
assert_eq!(current.vector, vec![1.0, 2.0]);
Ok(())
}
#[test]
fn test_snapshots() -> Result<()> {
let temp_dir = TempDir::new()?;
let mut store = VersionedStore::new(temp_dir.path().join("test.db"))?;
store.insert("doc1", vec![1.0, 2.0], create_metadata("v1"))?;
store.insert("doc2", vec![3.0, 4.0], create_metadata("v1"))?;
store.create_snapshot("checkpoint1", Some("Before changes".to_string()))?;
store.update("doc1", vec![5.0, 6.0], create_metadata("v2"), None)?;
store.update("doc2", vec![7.0, 8.0], create_metadata("v2"), None)?;
store.restore_snapshot("checkpoint1")?;
let doc1 = store.get_current_version("doc1").unwrap();
assert_eq!(doc1.vector, vec![1.0, 2.0]);
Ok(())
}
#[test]
fn test_compare_versions() -> Result<()> {
let temp_dir = TempDir::new()?;
let mut store = VersionedStore::new(temp_dir.path().join("test.db"))?;
store.insert("doc1", vec![1.0, 0.0], create_metadata("v1"))?;
store.update("doc1", vec![0.0, 1.0], create_metadata("v2"), None)?;
let diff = store.compare_versions("doc1", 1, 2)?;
assert!(diff.vector_changed);
assert!(diff.vector_distance.unwrap() > 0.0);
Ok(())
}
#[test]
fn test_persistence() -> Result<()> {
let temp_dir = TempDir::new()?;
let db_path = temp_dir.path().join("test.db");
{
let mut store = VersionedStore::new(&db_path)?;
store.insert("doc1", vec![1.0, 2.0], create_metadata("v1"))?;
store.update("doc1", vec![2.0, 3.0], create_metadata("v2"), None)?;
store.create_snapshot("snap1", None)?;
}
{
let store = VersionedStore::new(&db_path)?;
let history = store.get_history("doc1").unwrap();
assert_eq!(history.versions.len(), 2);
assert_eq!(store.list_snapshots().len(), 1);
}
Ok(())
}
}