use std::sync::Arc;
use tempfile::TempDir;
use umi_memory::constants::EMBEDDING_DIMENSIONS_COUNT;
use umi_memory::storage::{LanceVectorBackend, SimVectorBackend, VectorBackend};
fn create_sim_backend(seed: u64) -> SimVectorBackend {
SimVectorBackend::new(seed)
}
async fn create_lance_backend() -> (LanceVectorBackend, TempDir) {
let temp_dir = TempDir::new().unwrap();
let backend = LanceVectorBackend::connect(temp_dir.path().to_str().unwrap())
.await
.unwrap();
(backend, temp_dir)
}
async fn create_lance_backend_at_path(path: &str) -> LanceVectorBackend {
LanceVectorBackend::connect(path).await.unwrap()
}
fn generate_embedding(seed: u64, index: usize) -> Vec<f32> {
let mut emb = vec![0.0; EMBEDDING_DIMENSIONS_COUNT];
for i in 0..EMBEDDING_DIMENSIONS_COUNT {
emb[i] = ((seed + i as u64 + index as u64) % 1000) as f32 / 1000.0;
}
emb
}
#[tokio::test]
async fn dst_lance_persistence_store_and_retrieve() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().to_str().unwrap();
{
let backend = create_lance_backend_at_path(path).await;
let emb1 = generate_embedding(42, 1);
let emb2 = generate_embedding(42, 2);
backend.store("entity1", &emb1).await.unwrap();
backend.store("entity2", &emb2).await.unwrap();
assert!(backend.exists("entity1").await.unwrap());
assert!(backend.exists("entity2").await.unwrap());
assert_eq!(backend.count().await.unwrap(), 2);
}
{
let backend = create_lance_backend_at_path(path).await;
assert!(backend.exists("entity1").await.unwrap());
assert!(backend.exists("entity2").await.unwrap());
assert_eq!(backend.count().await.unwrap(), 2);
let emb1_retrieved = backend.get("entity1").await.unwrap().unwrap();
let emb1_expected = generate_embedding(42, 1);
assert_eq!(emb1_retrieved, emb1_expected);
}
}
#[tokio::test]
async fn dst_lance_persistence_update_across_restarts() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().to_str().unwrap();
{
let backend = create_lance_backend_at_path(path).await;
let emb1 = generate_embedding(42, 1);
backend.store("entity1", &emb1).await.unwrap();
}
{
let backend = create_lance_backend_at_path(path).await;
let emb2 = generate_embedding(42, 2);
backend.store("entity1", &emb2).await.unwrap();
}
{
let backend = create_lance_backend_at_path(path).await;
let emb_retrieved = backend.get("entity1").await.unwrap().unwrap();
let emb_expected = generate_embedding(42, 2);
assert_eq!(emb_retrieved, emb_expected);
}
}
#[tokio::test]
async fn dst_lance_persistence_delete_across_restarts() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().to_str().unwrap();
{
let backend = create_lance_backend_at_path(path).await;
let emb1 = generate_embedding(42, 1);
let emb2 = generate_embedding(42, 2);
backend.store("entity1", &emb1).await.unwrap();
backend.store("entity2", &emb2).await.unwrap();
}
{
let backend = create_lance_backend_at_path(path).await;
backend.delete("entity1").await.unwrap();
}
{
let backend = create_lance_backend_at_path(path).await;
assert!(!backend.exists("entity1").await.unwrap());
assert!(backend.exists("entity2").await.unwrap());
assert_eq!(backend.count().await.unwrap(), 1);
}
}
#[tokio::test]
async fn dst_lance_behavior_matches_sim_empty_search() {
let sim = create_sim_backend(42);
let (lance, _temp) = create_lance_backend().await;
let query = generate_embedding(42, 999);
let sim_results = sim.search(&query, 10).await.unwrap();
let lance_results = lance.search(&query, 10).await.unwrap();
assert_eq!(sim_results.len(), 0);
assert_eq!(lance_results.len(), 0);
}
#[tokio::test]
async fn dst_lance_behavior_matches_sim_store_and_count() {
let sim = create_sim_backend(42);
let (lance, _temp) = create_lance_backend().await;
let emb1 = generate_embedding(42, 1);
let emb2 = generate_embedding(42, 2);
sim.store("entity1", &emb1).await.unwrap();
sim.store("entity2", &emb2).await.unwrap();
lance.store("entity1", &emb1).await.unwrap();
lance.store("entity2", &emb2).await.unwrap();
assert_eq!(sim.count().await.unwrap(), lance.count().await.unwrap());
assert_eq!(
sim.exists("entity1").await.unwrap(),
lance.exists("entity1").await.unwrap()
);
assert_eq!(
sim.exists("entity2").await.unwrap(),
lance.exists("entity2").await.unwrap()
);
}
#[tokio::test]
async fn dst_lance_behavior_matches_sim_search_returns_stored() {
let sim = create_sim_backend(42);
let (lance, _temp) = create_lance_backend().await;
let emb1 = generate_embedding(42, 1);
sim.store("entity1", &emb1).await.unwrap();
lance.store("entity1", &emb1).await.unwrap();
let sim_results = sim.search(&emb1, 10).await.unwrap();
let lance_results = lance.search(&emb1, 10).await.unwrap();
assert_eq!(sim_results.len(), 1);
assert_eq!(lance_results.len(), 1);
assert_eq!(sim_results[0].id, "entity1");
assert_eq!(lance_results[0].id, "entity1");
}
#[tokio::test]
async fn dst_lance_behavior_matches_sim_delete() {
let sim = create_sim_backend(42);
let (lance, _temp) = create_lance_backend().await;
let emb1 = generate_embedding(42, 1);
sim.store("entity1", &emb1).await.unwrap();
lance.store("entity1", &emb1).await.unwrap();
sim.delete("entity1").await.unwrap();
lance.delete("entity1").await.unwrap();
assert_eq!(sim.exists("entity1").await.unwrap(), false);
assert_eq!(lance.exists("entity1").await.unwrap(), false);
assert_eq!(sim.count().await.unwrap(), 0);
assert_eq!(lance.count().await.unwrap(), 0);
}
#[tokio::test]
async fn dst_lance_concurrent_stores() {
let (lance, _temp) = create_lance_backend().await;
let init_emb = generate_embedding(42, 999);
lance.store("_init", &init_emb).await.unwrap();
lance.delete("_init").await.unwrap();
let lance = Arc::new(lance);
let mut handles = vec![];
for i in 0..3 {
let lance_clone = Arc::clone(&lance);
let handle = tokio::spawn(async move {
let emb = generate_embedding(42, i);
let id = format!("entity{}", i);
lance_clone.store(&id, &emb).await.unwrap();
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
assert_eq!(lance.count().await.unwrap(), 3);
assert!(lance.exists("entity0").await.unwrap());
assert!(lance.exists("entity1").await.unwrap());
assert!(lance.exists("entity2").await.unwrap());
}
#[tokio::test]
async fn dst_lance_concurrent_reads() {
let (lance, _temp) = create_lance_backend().await;
let emb1 = generate_embedding(42, 1);
lance.store("entity1", &emb1).await.unwrap();
let lance = Arc::new(lance);
let mut handles = vec![];
for _ in 0..5 {
let lance_clone = Arc::clone(&lance);
let handle = tokio::spawn(async move {
let exists = lance_clone.exists("entity1").await.unwrap();
let emb = lance_clone.get("entity1").await.unwrap();
(exists, emb)
});
handles.push(handle);
}
for handle in handles {
let (exists, emb) = handle.await.unwrap();
assert!(exists);
assert!(emb.is_some());
assert_eq!(emb.unwrap(), generate_embedding(42, 1));
}
}
#[tokio::test]
async fn dst_lance_concurrent_mixed_operations() {
let (lance, _temp) = create_lance_backend().await;
let init_emb = generate_embedding(42, 999);
lance.store("_init", &init_emb).await.unwrap();
lance.delete("_init").await.unwrap();
let lance = Arc::new(lance);
let mut handles = vec![];
for i in 0..2 {
let lance_clone = Arc::clone(&lance);
let handle = tokio::spawn(async move {
let emb = generate_embedding(42, i);
let id = format!("writer{}", i);
lance_clone.store(&id, &emb).await.unwrap();
});
handles.push(handle);
}
for _ in 0..2 {
let lance_clone = Arc::clone(&lance);
let handle = tokio::spawn(async move {
let _ = lance_clone.count().await.unwrap();
});
handles.push(handle);
}
for _ in 0..2 {
let lance_clone = Arc::clone(&lance);
let handle = tokio::spawn(async move {
let query = generate_embedding(42, 999);
let _ = lance_clone.search(&query, 10).await.unwrap();
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
assert!(lance.exists("writer0").await.unwrap());
assert!(lance.exists("writer1").await.unwrap());
}
#[tokio::test]
async fn dst_lance_large_batch_operations() {
let (lance, _temp) = create_lance_backend().await;
for i in 0..100 {
let emb = generate_embedding(42, i);
let id = format!("entity{}", i);
lance.store(&id, &emb).await.unwrap();
}
assert_eq!(lance.count().await.unwrap(), 100);
let query = generate_embedding(42, 50);
let results = lance.search(&query, 10).await.unwrap();
assert!(!results.is_empty());
assert!(results.len() <= 10);
}
#[tokio::test]
async fn dst_lance_update_same_id_multiple_times() {
let (lance, _temp) = create_lance_backend().await;
for i in 0..10 {
let emb = generate_embedding(42, i);
lance.store("entity1", &emb).await.unwrap();
}
assert_eq!(lance.count().await.unwrap(), 1);
let emb_retrieved = lance.get("entity1").await.unwrap().unwrap();
let emb_expected = generate_embedding(42, 9);
assert_eq!(emb_retrieved, emb_expected);
}
#[tokio::test]
async fn dst_lance_delete_nonexistent() {
let (lance, _temp) = create_lance_backend().await;
lance.delete("nonexistent").await.unwrap();
assert!(!lance.exists("nonexistent").await.unwrap());
assert_eq!(lance.count().await.unwrap(), 0);
}
#[tokio::test]
async fn dst_lance_get_nonexistent() {
let (lance, _temp) = create_lance_backend().await;
let result = lance.get("nonexistent").await.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn dst_lance_search_limit_respected() {
let (lance, _temp) = create_lance_backend().await;
for i in 0..20 {
let emb = generate_embedding(42, i);
let id = format!("entity{}", i);
lance.store(&id, &emb).await.unwrap();
}
let query = generate_embedding(42, 10);
let results = lance.search(&query, 5).await.unwrap();
assert!(results.len() <= 5);
}
#[tokio::test]
async fn dst_lance_deterministic_storage_retrieval() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().to_str().unwrap();
{
let lance = create_lance_backend_at_path(path).await;
for i in 0..10 {
let emb = generate_embedding(42, i);
let id = format!("entity{}", i);
lance.store(&id, &emb).await.unwrap();
}
}
let retrieved_run1 = {
let lance = create_lance_backend_at_path(path).await;
let mut retrieved = Vec::new();
for i in 0..10 {
let id = format!("entity{}", i);
let emb = lance.get(&id).await.unwrap().unwrap();
retrieved.push((id, emb));
}
retrieved
};
let retrieved_run2 = {
let lance = create_lance_backend_at_path(path).await;
let mut retrieved = Vec::new();
for i in 0..10 {
let id = format!("entity{}", i);
let emb = lance.get(&id).await.unwrap().unwrap();
retrieved.push((id, emb));
}
retrieved
};
assert_eq!(retrieved_run1, retrieved_run2);
}
#[tokio::test]
async fn dst_lance_deterministic_count_after_operations() {
let (lance, _temp) = create_lance_backend().await;
let emb1 = generate_embedding(42, 1);
let emb2 = generate_embedding(42, 2);
lance.store("e1", &emb1).await.unwrap();
assert_eq!(lance.count().await.unwrap(), 1);
lance.store("e2", &emb2).await.unwrap();
assert_eq!(lance.count().await.unwrap(), 2);
lance.store("e1", &emb2).await.unwrap(); assert_eq!(lance.count().await.unwrap(), 2);
lance.delete("e1").await.unwrap();
assert_eq!(lance.count().await.unwrap(), 1);
lance.delete("e2").await.unwrap();
assert_eq!(lance.count().await.unwrap(), 0);
}