use crate::{Collection, HybridQuery, Metadata, Neighbor, Query, VecDatabase, VecStore};
use anyhow::Result;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
#[derive(Clone)]
pub struct AsyncVecStore {
inner: Arc<RwLock<VecStore>>,
}
impl AsyncVecStore {
pub async fn open<P: Into<PathBuf>>(path: P) -> Result<Self> {
let path = path.into();
let store = tokio::task::spawn_blocking(move || VecStore::open(path)).await??;
Ok(Self {
inner: Arc::new(RwLock::new(store)),
})
}
pub async fn upsert(&self, id: String, vector: Vec<f32>, metadata: Metadata) -> Result<()> {
let inner = self.inner.clone();
tokio::task::spawn_blocking(move || {
let mut store = inner.write().unwrap();
store.upsert(id, vector, metadata)
})
.await?
}
pub async fn batch_upsert(&self, records: Vec<crate::Record>) -> Result<()> {
let inner = self.inner.clone();
tokio::task::spawn_blocking(move || {
let mut store = inner.write().unwrap();
store.batch_upsert(records)
})
.await?
}
pub async fn query(&self, query: Query) -> Result<Vec<Neighbor>> {
let inner = self.inner.clone();
tokio::task::spawn_blocking(move || {
let store = inner.read().unwrap();
store.query(query)
})
.await?
}
pub async fn query_with_filter(
&self,
vector: Vec<f32>,
k: usize,
filter: &str,
) -> Result<Vec<Neighbor>> {
let filter = filter.to_string();
let inner = self.inner.clone();
tokio::task::spawn_blocking(move || {
let store = inner.read().unwrap();
store.query_with_filter(vector, k, &filter)
})
.await?
}
pub async fn remove(&self, id: &str) -> Result<()> {
let id = id.to_string();
let inner = self.inner.clone();
tokio::task::spawn_blocking(move || {
let mut store = inner.write().unwrap();
store.remove(&id)
})
.await?
}
pub async fn save(&self) -> Result<()> {
let inner = self.inner.clone();
tokio::task::spawn_blocking(move || {
let store = inner.read().unwrap();
store.save()
})
.await?
}
pub async fn count(&self) -> usize {
let inner = self.inner.clone();
tokio::task::spawn_blocking(move || {
let store = inner.read().unwrap();
store.count()
})
.await
.unwrap_or(0)
}
pub async fn dimension(&self) -> usize {
let inner = self.inner.clone();
tokio::task::spawn_blocking(move || {
let store = inner.read().unwrap();
store.dimension()
})
.await
.unwrap_or(0)
}
pub async fn create_snapshot(&self, name: &str) -> Result<()> {
let name = name.to_string();
let inner = self.inner.clone();
tokio::task::spawn_blocking(move || {
let store = inner.read().unwrap();
store.create_snapshot(&name)
})
.await?
}
pub async fn list_snapshots(&self) -> Result<Vec<(String, String, usize)>> {
let inner = self.inner.clone();
tokio::task::spawn_blocking(move || {
let store = inner.read().unwrap();
store.list_snapshots()
})
.await?
}
pub async fn restore_snapshot(&self, name: &str) -> Result<()> {
let name = name.to_string();
let inner = self.inner.clone();
tokio::task::spawn_blocking(move || {
let mut store = inner.write().unwrap();
store.restore_snapshot(&name)
})
.await?
}
pub async fn delete_snapshot(&self, name: &str) -> Result<()> {
let name = name.to_string();
let inner = self.inner.clone();
tokio::task::spawn_blocking(move || {
let store = inner.read().unwrap();
store.delete_snapshot(&name)
})
.await?
}
pub fn with_sync<F, R>(&self, f: F) -> Result<R>
where
F: FnOnce(&VecStore) -> Result<R>,
{
let store = self.inner.read().unwrap();
f(&store)
}
pub fn with_sync_mut<F, R>(&self, f: F) -> Result<R>
where
F: FnOnce(&mut VecStore) -> Result<R>,
{
let mut store = self.inner.write().unwrap();
f(&mut store)
}
pub async fn hybrid_query(&self, query: HybridQuery) -> Result<Vec<Neighbor>> {
let inner = self.inner.clone();
tokio::task::spawn_blocking(move || {
let store = inner.read().unwrap();
store.hybrid_query(query)
})
.await?
}
pub async fn index_text(&self, id: &str, text: &str) -> Result<()> {
let id = id.to_string();
let text = text.to_string();
let inner = self.inner.clone();
tokio::task::spawn_blocking(move || {
let mut store = inner.write().unwrap();
store.index_text(&id, &text)
})
.await?
}
}
#[derive(Clone)]
pub struct AsyncVecDatabase {
inner: Arc<RwLock<VecDatabase>>,
}
impl AsyncVecDatabase {
pub async fn open<P: Into<PathBuf>>(path: P) -> Result<Self> {
let path = path.into();
let db = tokio::task::spawn_blocking(move || VecDatabase::open(path)).await??;
Ok(Self {
inner: Arc::new(RwLock::new(db)),
})
}
pub async fn create_collection(&self, name: &str) -> Result<AsyncCollection> {
let name = name.to_string();
let inner = self.inner.clone();
let collection = tokio::task::spawn_blocking(move || {
let mut db = inner.write().unwrap();
db.create_collection(&name)
})
.await??;
Ok(AsyncCollection::new(collection))
}
pub async fn get_collection(&self, name: &str) -> Result<Option<AsyncCollection>> {
let name = name.to_string();
let inner = self.inner.clone();
let collection_opt = tokio::task::spawn_blocking(move || {
let db = inner.read().unwrap();
db.get_collection(&name)
})
.await??;
Ok(collection_opt.map(AsyncCollection::new))
}
pub async fn list_collections(&self) -> Result<Vec<String>> {
let inner = self.inner.clone();
tokio::task::spawn_blocking(move || {
let db = inner.read().unwrap();
db.list_collections()
.map_err(|e| anyhow::anyhow!("List collections failed: {}", e))
})
.await?
}
pub async fn delete_collection(&self, name: &str) -> Result<()> {
let name = name.to_string();
let inner = self.inner.clone();
tokio::task::spawn_blocking(move || {
let mut db = inner.write().unwrap();
db.delete_collection(&name)
.map_err(|e| anyhow::anyhow!("Delete collection failed: {}", e))
})
.await?
}
}
#[derive(Clone)]
pub struct AsyncCollection {
inner: Arc<RwLock<Collection>>,
}
impl AsyncCollection {
fn new(collection: Collection) -> Self {
Self {
inner: Arc::new(RwLock::new(collection)),
}
}
pub async fn upsert(&self, id: String, vector: Vec<f32>, metadata: Metadata) -> Result<()> {
let inner = self.inner.clone();
tokio::task::spawn_blocking(move || {
let mut collection = inner.write().unwrap();
collection
.upsert(id, vector, metadata)
.map_err(|e| anyhow::anyhow!("Collection upsert failed: {}", e))
})
.await?
}
pub async fn query(&self, query: Query) -> Result<Vec<Neighbor>> {
let inner = self.inner.clone();
tokio::task::spawn_blocking(move || {
let collection = inner.read().unwrap();
collection
.query(query)
.map_err(|e| anyhow::anyhow!("Collection query failed: {}", e))
})
.await?
}
pub async fn delete(&self, id: &str) -> Result<()> {
let id = id.to_string();
let inner = self.inner.clone();
tokio::task::spawn_blocking(move || {
let mut collection = inner.write().unwrap();
collection
.delete(&id)
.map_err(|e| anyhow::anyhow!("Collection delete failed: {}", e))
})
.await?
}
pub async fn count(&self) -> Result<usize> {
let inner = self.inner.clone();
tokio::task::spawn_blocking(move || {
let collection = inner.read().unwrap();
collection
.count()
.map_err(|e| anyhow::anyhow!("Collection count failed: {}", e))
})
.await?
}
pub async fn stats(&self) -> Result<crate::namespace_manager::NamespaceStats> {
let inner = self.inner.clone();
tokio::task::spawn_blocking(move || {
let collection = inner.read().unwrap();
collection
.stats()
.map_err(|e| anyhow::anyhow!("Collection stats failed: {}", e))
})
.await?
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
#[tokio::test]
async fn test_async_basic_operations() {
let temp_dir = tempfile::tempdir().unwrap();
let store = AsyncVecStore::open(temp_dir.path()).await.unwrap();
let meta = Metadata {
fields: HashMap::new(),
};
store
.upsert("doc1".into(), vec![1.0, 0.0, 0.0], meta.clone())
.await
.unwrap();
store
.upsert("doc2".into(), vec![0.0, 1.0, 0.0], meta)
.await
.unwrap();
assert_eq!(store.count().await, 2);
let results = store
.query(Query {
vector: vec![1.0, 0.0, 0.0],
k: 1,
filter: None,
})
.await
.unwrap();
assert!(results.len() >= 1);
assert_eq!(results[0].id, "doc1");
}
#[tokio::test]
async fn test_async_concurrent_queries() {
let temp_dir = tempfile::tempdir().unwrap();
let store = AsyncVecStore::open(temp_dir.path()).await.unwrap();
let meta = Metadata {
fields: HashMap::new(),
};
for i in 0..10 {
store
.upsert(format!("doc{}", i), vec![i as f32, 0.0, 0.0], meta.clone())
.await
.unwrap();
}
let store1 = store.clone();
let store2 = store.clone();
let store3 = store.clone();
let (r1, r2, r3) = tokio::join!(
store1.query(Query {
vector: vec![5.0, 0.0, 0.0],
k: 3,
filter: None,
}),
store2.query(Query {
vector: vec![2.0, 0.0, 0.0],
k: 3,
filter: None,
}),
store3.query(Query {
vector: vec![8.0, 0.0, 0.0],
k: 3,
filter: None,
}),
);
assert!(r1.is_ok());
assert!(r2.is_ok());
assert!(r3.is_ok());
}
#[tokio::test]
async fn test_async_filter_query() {
let temp_dir = tempfile::tempdir().unwrap();
let store = AsyncVecStore::open(temp_dir.path()).await.unwrap();
for i in 0..10 {
let mut meta = Metadata {
fields: HashMap::new(),
};
meta.fields.insert("value".into(), serde_json::json!(i));
store
.upsert(format!("doc{}", i), vec![i as f32, 0.0, 0.0], meta)
.await
.unwrap();
}
let results = store
.query_with_filter(vec![5.0, 0.0, 0.0], 10, "value >= 5")
.await
.unwrap();
assert!(results.len() >= 1);
for result in &results {
let value = result
.metadata
.fields
.get("value")
.unwrap()
.as_i64()
.unwrap();
assert!(value >= 5);
}
}
#[tokio::test]
async fn test_async_snapshots() {
let temp_dir = tempfile::tempdir().unwrap();
let store = AsyncVecStore::open(temp_dir.path()).await.unwrap();
let meta = Metadata {
fields: HashMap::new(),
};
store
.upsert("doc1".into(), vec![1.0, 0.0, 0.0], meta.clone())
.await
.unwrap();
store.create_snapshot("test-snapshot").await.unwrap();
store
.upsert("doc2".into(), vec![2.0, 0.0, 0.0], meta)
.await
.unwrap();
assert_eq!(store.count().await, 2);
store.restore_snapshot("test-snapshot").await.unwrap();
assert_eq!(store.count().await, 1);
let snapshots = store.list_snapshots().await.unwrap();
assert_eq!(snapshots.len(), 1);
store.delete_snapshot("test-snapshot").await.unwrap();
let snapshots = store.list_snapshots().await.unwrap();
assert_eq!(snapshots.len(), 0);
}
}