use anyhow::{anyhow, Result};
use parking_lot::Mutex;
use serde_json::Value as JsonValue;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use vecstore::{Metadata, Query, VecStore};
pub use vecstore::Neighbor as SearchResult;
use crate::storage::embedding::EmbeddingEngine;
use crate::storage::fingerprint::json_fingerprint;
#[derive(Clone)]
pub struct VectorEngine {
path: String,
store: Arc<Mutex<Option<VecStore>>>,
embedding: Option<Arc<EmbeddingEngine>>,
dirty: Arc<AtomicBool>,
}
impl VectorEngine {
pub fn with_embedding(path: &str, engine: EmbeddingEngine) -> Result<Self> {
Ok(Self {
path: path.to_string(),
store: Arc::new(Mutex::new(None)),
embedding: Some(Arc::new(engine)),
dirty: Arc::new(AtomicBool::new(false)),
})
}
pub fn store_document(&self, id: &str, document: JsonValue) -> Result<()> {
let Some(engine) = &self.embedding else {
return Ok(());
};
let fingerprint = json_fingerprint(&document);
let vector = engine.embed(&fingerprint)?;
let meta = json_to_metadata(document);
let dirty = self.dirty.clone();
self.with_store(|s| {
s.upsert(id.to_string(), vector, meta)
.map_err(|e| anyhow!("failed to store document {id:?}: {e}"))?;
dirty.store(true, Ordering::Release);
Ok(())
})
}
pub fn store_documents_batch(&self, entries: &[(&str, JsonValue)]) -> Result<()> {
let Some(engine) = &self.embedding else {
return Ok(());
};
if entries.is_empty() {
return Ok(());
}
let fingerprints: Vec<String> = entries
.iter()
.map(|(_, doc)| json_fingerprint(doc))
.collect();
let fp_refs: Vec<&str> = fingerprints.iter().map(String::as_str).collect();
let vectors = engine.embed_batch(&fp_refs)?;
let dirty = self.dirty.clone();
self.with_store(|s| {
for ((id, doc), vector) in entries.iter().zip(vectors) {
let meta = json_to_metadata(doc.clone());
s.upsert(id.to_string(), vector, meta)
.map_err(|e| anyhow!("failed to store document {id:?}: {e}"))?;
}
dirty.store(true, Ordering::Release);
Ok(())
})
}
pub fn delete_vector(&self, id: &str) -> Result<()> {
let dirty = self.dirty.clone();
self.with_store(|s| {
match s.remove(id) {
Ok(()) => {
dirty.store(true, Ordering::Release);
Ok(())
}
Err(e) if e.to_string().to_lowercase().contains("not found") => Ok(()),
Err(e) => Err(anyhow!("failed to remove vector {id:?}: {e}")),
}
})
}
pub fn search(&self, query_vector: Vec<f32>, limit: usize) -> Result<Vec<SearchResult>> {
let q = Query::new(query_vector).with_limit(limit);
let mut results = self
.with_store(|s| s.query(q).map_err(|e| anyhow!("vector search failed: {e}")))?;
distance_to_similarity(&mut results);
Ok(results)
}
pub fn search_json(&self, query: &JsonValue, limit: usize) -> Result<Vec<SearchResult>> {
let engine = self
.embedding
.clone()
.ok_or_else(|| anyhow!("search_json requires an EmbeddingEngine"))?;
let fingerprint = json_fingerprint(query);
let vector = engine.embed(&fingerprint)?;
self.search(vector, limit)
}
pub fn sync(&self) -> Result<()> {
if !self.dirty.load(Ordering::Acquire) {
return Ok(());
}
let mut guard = self.store.lock();
if !self.dirty.load(Ordering::Acquire) {
return Ok(());
}
let Some(s) = guard.as_mut() else {
self.dirty.store(false, Ordering::Release);
return Ok(());
};
match s.save() {
Ok(()) => {
self.dirty.store(false, Ordering::Release);
Ok(())
}
Err(e) => Err(anyhow!("failed to sync vector store: {e}")),
}
}
fn with_store<R, F: FnOnce(&mut VecStore) -> Result<R>>(&self, f: F) -> Result<R> {
let mut guard = self.store.lock();
if guard.is_none() {
*guard = Some(
VecStore::open(&self.path)
.map_err(|e| anyhow!("failed to open vector store at {:?}: {e}", self.path))?,
);
}
f(guard.as_mut().unwrap())
}
}
fn distance_to_similarity(results: &mut [SearchResult]) {
for r in results.iter_mut() {
r.score = 1.0 - r.score;
}
}
fn json_to_metadata(json: JsonValue) -> Metadata {
let fields = match json {
JsonValue::Object(map) => map.into_iter().collect(),
other => {
let mut m = HashMap::new();
m.insert("value".to_string(), other);
m
}
};
Metadata { fields }
}