use std::sync::Arc;
use common::{
DakeraError, DecayConfig, DecayStrategy, DistanceMetric, Memory, MemoryType, QueryRequest,
Vector,
};
use engine::{DecayEngine, SearchEngine};
use storage::{InMemoryStorage, VectorStorage};
fn bare_vec(id: &str, values: Vec<f32>) -> Vector {
Vector {
id: id.to_string(),
values,
metadata: None,
ttl_seconds: None,
expires_at: None,
}
}
fn cosine_query(vector: Vec<f32>, top_k: usize) -> QueryRequest {
QueryRequest {
vector,
top_k,
distance_metric: DistanceMetric::Cosine,
include_metadata: false,
include_vectors: false,
filter: None,
cursor: None,
consistency: Default::default(),
staleness_config: None,
}
}
async fn make_engine(ns: &str) -> (SearchEngine<InMemoryStorage>, Arc<InMemoryStorage>) {
let storage = Arc::new(InMemoryStorage::new());
let engine = SearchEngine::new(storage.clone());
storage.ensure_namespace(&ns.to_string()).await.unwrap();
(engine, storage)
}
fn secs_ago(n: u64) -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
.saturating_sub(n)
}
fn memory_vector(id: &str, agent_id: &str, last_accessed_secs_ago: u64, importance: f32) -> Vector {
let ts = secs_ago(last_accessed_secs_ago);
let mem = Memory {
id: id.to_string(),
memory_type: MemoryType::Episodic,
content: "test content".to_string(),
agent_id: agent_id.to_string(),
session_id: None,
importance,
tags: vec![],
metadata: None,
created_at: ts,
last_accessed_at: ts,
access_count: 0,
ttl_seconds: None,
expires_at: None, };
mem.to_vector(vec![0.1, 0.2, 0.3])
}
fn fast_decay() -> DecayEngine {
DecayEngine::new(DecayConfig {
strategy: DecayStrategy::Exponential,
half_life_hours: 1.0,
min_importance: 0.5,
})
}
#[tokio::test]
async fn upsert_and_query_returns_nearest() {
let ns = "int_uq_nearest";
let (engine, storage) = make_engine(ns).await;
storage
.upsert(
&ns.to_string(),
vec![
bare_vec("a", vec![1.0, 0.0, 0.0]),
bare_vec("b", vec![0.0, 1.0, 0.0]),
bare_vec("c", vec![0.0, 0.0, 1.0]),
],
)
.await
.unwrap();
let resp = engine
.search(&ns.to_string(), &cosine_query(vec![1.0, 0.0, 0.0], 1))
.await
.unwrap();
assert_eq!(resp.results.len(), 1);
assert_eq!(resp.results[0].id, "a");
assert!(
resp.results[0].score > 0.99,
"expected score ~1.0, got {}",
resp.results[0].score
);
}
#[tokio::test]
async fn upsert_overwrites_same_id() {
let ns = "int_uq_overwrite";
let (engine, storage) = make_engine(ns).await;
storage
.upsert(&ns.to_string(), vec![bare_vec("v1", vec![1.0, 0.0, 0.0])])
.await
.unwrap();
storage
.upsert(&ns.to_string(), vec![bare_vec("v1", vec![0.0, 1.0, 0.0])])
.await
.unwrap();
assert_eq!(
storage.count(&ns.to_string()).await.unwrap(),
1,
"upsert must not grow count"
);
let resp = engine
.search(&ns.to_string(), &cosine_query(vec![0.0, 1.0, 0.0], 1))
.await
.unwrap();
assert_eq!(resp.results[0].id, "v1");
assert!(
resp.results[0].score > 0.99,
"score={}",
resp.results[0].score
);
}
#[tokio::test]
async fn deleted_vector_absent_from_search() {
let ns = "int_uq_delete";
let (engine, storage) = make_engine(ns).await;
storage
.upsert(
&ns.to_string(),
vec![
bare_vec("keep", vec![1.0, 0.0, 0.0]),
bare_vec("gone", vec![1.0, 0.0, 0.0]),
],
)
.await
.unwrap();
storage
.delete(&ns.to_string(), &["gone".to_string()])
.await
.unwrap();
let resp = engine
.search(&ns.to_string(), &cosine_query(vec![1.0, 0.0, 0.0], 10))
.await
.unwrap();
assert!(
resp.results.iter().all(|r| r.id != "gone"),
"deleted vector 'gone' appeared in search results"
);
}
#[tokio::test]
async fn search_nonexistent_namespace_errors() {
let storage = Arc::new(InMemoryStorage::new());
let engine = SearchEngine::new(storage);
let err = engine
.search(
&"no_such_ns".to_string(),
&cosine_query(vec![1.0, 0.0, 0.0], 5),
)
.await
.unwrap_err();
assert!(
matches!(err, DakeraError::NamespaceNotFound(_)),
"expected NamespaceNotFound, got {:?}",
err
);
}
#[tokio::test]
async fn search_dimension_mismatch_errors() {
let ns = "int_uq_dim";
let (engine, storage) = make_engine(ns).await;
storage
.upsert(&ns.to_string(), vec![bare_vec("v1", vec![1.0, 0.0, 0.0])])
.await
.unwrap();
let err = engine
.search(&ns.to_string(), &cosine_query(vec![1.0, 0.0], 5)) .await
.unwrap_err();
assert!(
matches!(
err,
DakeraError::DimensionMismatch {
expected: 3,
actual: 2
}
),
"expected DimensionMismatch, got {:?}",
err
);
}
#[tokio::test]
async fn search_empty_namespace_returns_empty_results() {
let ns = "int_uq_empty";
let (engine, _storage) = make_engine(ns).await;
let resp = engine
.search(&ns.to_string(), &cosine_query(vec![1.0, 0.0, 0.0], 5))
.await
.unwrap();
assert!(resp.results.is_empty());
}
#[tokio::test]
async fn batch_100_vectors_exact_match_is_top_result() {
let ns = "int_batch_100";
let (engine, storage) = make_engine(ns).await;
let vectors: Vec<Vector> = (0..100u32)
.map(|i| {
let a = i as f32;
bare_vec(&format!("v{}", i), vec![a.cos(), a.sin(), 0.0])
})
.collect();
let n = storage.upsert(&ns.to_string(), vectors).await.unwrap();
assert_eq!(n, 100);
assert_eq!(storage.count(&ns.to_string()).await.unwrap(), 100);
let a = 42.0f32;
let resp = engine
.search(
&ns.to_string(),
&cosine_query(vec![a.cos(), a.sin(), 0.0], 5),
)
.await
.unwrap();
assert_eq!(resp.results.len(), 5);
assert_eq!(
resp.results[0].id, "v42",
"expected v42 at top, got {}",
resp.results[0].id
);
assert!(
resp.results[0].score > 0.999,
"expected near-perfect score for exact match, got {}",
resp.results[0].score
);
}
#[tokio::test]
async fn batch_upsert_overlapping_ids_correct_count() {
let ns = "int_batch_overlap";
let (_engine, storage) = make_engine(ns).await;
let batch1: Vec<Vector> = (0..50u32)
.map(|i| {
let a = i as f32;
bare_vec(&format!("v{}", i), vec![a.cos(), a.sin()])
})
.collect();
storage.upsert(&ns.to_string(), batch1).await.unwrap();
let batch2: Vec<Vector> = (25..75u32)
.map(|i| {
let a = (i + 100) as f32; bare_vec(&format!("v{}", i), vec![a.cos(), a.sin()])
})
.collect();
storage.upsert(&ns.to_string(), batch2).await.unwrap();
let total = storage.count(&ns.to_string()).await.unwrap();
assert_eq!(total, 75, "expected 75 unique vectors, got {}", total);
}
#[tokio::test]
async fn top_k_capped_at_available_vector_count() {
let ns = "int_batch_topk";
let (engine, storage) = make_engine(ns).await;
storage
.upsert(
&ns.to_string(),
vec![
bare_vec("a", vec![1.0, 0.0, 0.0]),
bare_vec("b", vec![0.0, 1.0, 0.0]),
bare_vec("c", vec![0.0, 0.0, 1.0]),
],
)
.await
.unwrap();
let resp = engine
.search(&ns.to_string(), &cosine_query(vec![1.0, 0.0, 0.0], 10))
.await
.unwrap();
assert_eq!(
resp.results.len(),
3,
"results must be capped at available count (3), got {}",
resp.results.len()
);
}
#[tokio::test]
async fn search_includes_vectors_when_requested() {
let ns = "int_batch_include_vec";
let (engine, storage) = make_engine(ns).await;
storage
.upsert(&ns.to_string(), vec![bare_vec("v1", vec![1.0, 0.0, 0.0])])
.await
.unwrap();
let resp = engine
.search(
&ns.to_string(),
&QueryRequest {
vector: vec![1.0, 0.0, 0.0],
top_k: 1,
distance_metric: DistanceMetric::Cosine,
include_metadata: false,
include_vectors: true,
filter: None,
cursor: None,
consistency: Default::default(),
staleness_config: None,
},
)
.await
.unwrap();
assert_eq!(resp.results.len(), 1);
let vector_data = resp.results[0]
.vector
.as_ref()
.expect("vector data should be present");
assert_eq!(vector_data, &vec![1.0f32, 0.0, 0.0]);
}
#[tokio::test]
async fn decay_sweeps_stale_memory_below_min_importance() {
let ns = "_dakera_agent_decay_stale";
let storage: Arc<dyn VectorStorage> = Arc::new(InMemoryStorage::new());
storage.ensure_namespace(&ns.to_string()).await.unwrap();
let stale = memory_vector("stale_mem", "agent1", 200 * 3600, 0.8);
storage.upsert(&ns.to_string(), vec![stale]).await.unwrap();
let result = fast_decay()
.apply_decay(&storage, &std::collections::HashMap::new())
.await;
assert_eq!(result.namespaces_processed, 1);
assert_eq!(
result.memories_deleted, 0,
"decay engine must not hard-delete memories; deleted={}",
result.memories_deleted
);
assert_eq!(
result.memories_floored, 1,
"stale memory must be floored at min_importance; floored={}",
result.memories_floored
);
let remaining = storage.get_all(&ns.to_string()).await.unwrap();
assert_eq!(
remaining.len(),
1,
"floored memory must remain in storage (no hard-delete)"
);
let mem = Memory::from_vector(&remaining[0]).expect("must be a valid memory vector");
assert!(
(mem.importance - 0.5).abs() < 0.001,
"importance must be floored at min_importance=0.5; got {}",
mem.importance
);
}
#[tokio::test]
async fn decay_preserves_recently_accessed_memory() {
let ns = "_dakera_agent_decay_fresh";
let storage: Arc<dyn VectorStorage> = Arc::new(InMemoryStorage::new());
storage.ensure_namespace(&ns.to_string()).await.unwrap();
let fresh = memory_vector("fresh_mem", "agent1", 1, 0.9);
storage.upsert(&ns.to_string(), vec![fresh]).await.unwrap();
let result = fast_decay()
.apply_decay(&storage, &std::collections::HashMap::new())
.await;
assert_eq!(
result.memories_deleted, 0,
"recently accessed memory must not be deleted"
);
let remaining = storage.get_all(&ns.to_string()).await.unwrap();
assert_eq!(remaining.len(), 1, "fresh memory should still be present");
}
#[tokio::test]
async fn decay_only_processes_agent_namespaces() {
let agent_ns = "_dakera_agent_decay_scope";
let plain_ns = "regular_ns_decay_scope";
let storage: Arc<dyn VectorStorage> = Arc::new(InMemoryStorage::new());
storage
.ensure_namespace(&agent_ns.to_string())
.await
.unwrap();
storage
.ensure_namespace(&plain_ns.to_string())
.await
.unwrap();
storage
.upsert(
&agent_ns.to_string(),
vec![memory_vector("stale_agent", "agent1", 200 * 3600, 0.8)],
)
.await
.unwrap();
storage
.upsert(
&plain_ns.to_string(),
vec![memory_vector("stale_plain", "agent1", 200 * 3600, 0.8)],
)
.await
.unwrap();
let result = fast_decay()
.apply_decay(&storage, &std::collections::HashMap::new())
.await;
assert_eq!(
result.namespaces_processed, 1,
"only _dakera_agent_ namespaces should be processed"
);
let plain_count = storage.get_all(&plain_ns.to_string()).await.unwrap().len();
assert_eq!(
plain_count, 1,
"plain namespace should not be modified by decay engine"
);
}
#[tokio::test]
async fn decay_deletes_stale_and_keeps_fresh_counts_match() {
let ns = "_dakera_agent_decay_mixed";
let storage: Arc<dyn VectorStorage> = Arc::new(InMemoryStorage::new());
storage.ensure_namespace(&ns.to_string()).await.unwrap();
for i in 0..3u32 {
let v = memory_vector(&format!("stale{}", i), "agent1", 200 * 3600, 0.8);
storage.upsert(&ns.to_string(), vec![v]).await.unwrap();
}
for i in 0..2u32 {
let v = memory_vector(&format!("fresh{}", i), "agent1", 1, 0.9);
storage.upsert(&ns.to_string(), vec![v]).await.unwrap();
}
let result = fast_decay()
.apply_decay(&storage, &std::collections::HashMap::new())
.await;
assert_eq!(
result.memories_processed, 5,
"all 5 memories should be evaluated"
);
assert_eq!(
result.memories_deleted, 0,
"decay engine must not hard-delete any memories; deleted={}",
result.memories_deleted
);
assert_eq!(
result.memories_floored, 3,
"3 stale memories should be floored at min_importance; floored={}",
result.memories_floored
);
let remaining = storage.get_all(&ns.to_string()).await.unwrap();
assert_eq!(
remaining.len(),
5,
"all 5 memories must remain in storage (3 floored + 2 fresh)"
);
}
#[tokio::test]
async fn decay_floors_at_min_importance_never_hard_deletes() {
let ns = "_dakera_agent_decay_floor";
let storage: Arc<dyn VectorStorage> = Arc::new(InMemoryStorage::new());
storage.ensure_namespace(&ns.to_string()).await.unwrap();
let stale = memory_vector("floor_mem", "agent1", 200 * 3600, 0.8);
storage.upsert(&ns.to_string(), vec![stale]).await.unwrap();
let result = fast_decay()
.apply_decay(&storage, &std::collections::HashMap::new())
.await;
assert_eq!(
result.memories_deleted, 0,
"decay engine must never hard-delete memories; deleted={}",
result.memories_deleted
);
assert_eq!(
result.memories_floored, 1,
"memory below min_importance must be floored, not deleted; floored={}",
result.memories_floored
);
let remaining = storage.get_all(&ns.to_string()).await.unwrap();
assert_eq!(remaining.len(), 1, "floored memory must remain in storage");
let mem = Memory::from_vector(&remaining[0]).expect("must be a valid memory vector");
assert!(
(mem.importance - 0.5).abs() < 0.001,
"importance must be floored at min_importance=0.5; got {}",
mem.importance
);
}
#[tokio::test]
async fn decay_access_weighted_decay_slows_for_recalled_memories() {
let ns = "_dakera_agent_decay_access_weight";
let storage: Arc<dyn VectorStorage> = Arc::new(InMemoryStorage::new());
storage.ensure_namespace(&ns.to_string()).await.unwrap();
let ts = secs_ago(50 * 3600);
let never_recalled = Memory {
id: "never".to_string(),
memory_type: MemoryType::Episodic,
content: "test".to_string(),
agent_id: "agent1".to_string(),
session_id: None,
importance: 0.8,
tags: vec![],
metadata: None,
created_at: ts,
last_accessed_at: ts,
access_count: 0,
ttl_seconds: None,
expires_at: None,
};
let freq_recalled = Memory {
id: "frequent".to_string(),
memory_type: MemoryType::Episodic,
content: "test".to_string(),
agent_id: "agent1".to_string(),
session_id: None,
importance: 0.8,
tags: vec![],
metadata: None,
created_at: ts,
last_accessed_at: ts,
access_count: 20,
ttl_seconds: None,
expires_at: None,
};
storage
.upsert(
&ns.to_string(),
vec![
never_recalled.to_vector(vec![0.1, 0.2, 0.3]),
freq_recalled.to_vector(vec![0.1, 0.2, 0.3]),
],
)
.await
.unwrap();
let engine = DecayEngine::new(DecayConfig {
strategy: DecayStrategy::Exponential,
half_life_hours: 20.0,
min_importance: 0.001,
});
let _ = engine
.apply_decay(&storage, &std::collections::HashMap::new())
.await;
let remaining = storage.get_all(&ns.to_string()).await.unwrap();
assert_eq!(remaining.len(), 2, "both memories must remain");
let get_importance = |id: &str| {
remaining
.iter()
.find(|v| v.id == id)
.and_then(|v| Memory::from_vector(v))
.map(|m| m.importance)
.expect("memory must exist")
};
let never_imp = get_importance("never");
let freq_imp = get_importance("frequent");
assert!(
freq_imp > never_imp,
"frequently recalled memory (importance={freq_imp:.4}) must decay slower \
than never-recalled (importance={never_imp:.4})"
);
}
#[tokio::test]
async fn same_id_in_two_namespaces_is_independent() {
let ns_a = "int_ns_iso_a";
let ns_b = "int_ns_iso_b";
let storage = Arc::new(InMemoryStorage::new());
let engine = SearchEngine::new(storage.clone());
storage.ensure_namespace(&ns_a.to_string()).await.unwrap();
storage.ensure_namespace(&ns_b.to_string()).await.unwrap();
storage
.upsert(&ns_a.to_string(), vec![bare_vec("x", vec![1.0, 0.0, 0.0])])
.await
.unwrap();
storage
.upsert(&ns_b.to_string(), vec![bare_vec("x", vec![0.0, 1.0, 0.0])])
.await
.unwrap();
let ra = engine
.search(&ns_a.to_string(), &cosine_query(vec![1.0, 0.0, 0.0], 1))
.await
.unwrap();
assert_eq!(ra.results[0].id, "x");
assert!(
ra.results[0].score > 0.99,
"ns_a score={}",
ra.results[0].score
);
let rb = engine
.search(&ns_b.to_string(), &cosine_query(vec![0.0, 1.0, 0.0], 1))
.await
.unwrap();
assert_eq!(rb.results[0].id, "x");
assert!(
rb.results[0].score > 0.99,
"ns_b score={}",
rb.results[0].score
);
let rc = engine
.search(&ns_a.to_string(), &cosine_query(vec![0.0, 1.0, 0.0], 1))
.await
.unwrap();
assert!(
rc.results[0].score < 0.01,
"ns_a 'x' should be orthogonal to [0,1,0], score={}",
rc.results[0].score
);
}
#[tokio::test]
async fn deleting_one_namespace_leaves_other_intact() {
let ns_a = "int_ns_del_a";
let ns_b = "int_ns_del_b";
let storage = Arc::new(InMemoryStorage::new());
storage.ensure_namespace(&ns_a.to_string()).await.unwrap();
storage.ensure_namespace(&ns_b.to_string()).await.unwrap();
storage
.upsert(&ns_a.to_string(), vec![bare_vec("v1", vec![1.0, 0.0])])
.await
.unwrap();
storage
.upsert(&ns_b.to_string(), vec![bare_vec("v1", vec![1.0, 0.0])])
.await
.unwrap();
storage.delete_namespace(&ns_a.to_string()).await.unwrap();
assert!(
!storage.namespace_exists(&ns_a.to_string()).await.unwrap(),
"ns_a should be gone"
);
assert!(
storage.namespace_exists(&ns_b.to_string()).await.unwrap(),
"ns_b should still exist"
);
assert_eq!(
storage.count(&ns_b.to_string()).await.unwrap(),
1,
"ns_b count unchanged"
);
}
#[tokio::test]
async fn vectors_do_not_leak_across_namespaces() {
let ns_a = "int_ns_leak_a";
let ns_b = "int_ns_leak_b";
let storage = Arc::new(InMemoryStorage::new());
let engine = SearchEngine::new(storage.clone());
storage.ensure_namespace(&ns_a.to_string()).await.unwrap();
storage.ensure_namespace(&ns_b.to_string()).await.unwrap();
storage
.upsert(
&ns_a.to_string(),
vec![
bare_vec("only_in_a", vec![1.0, 0.0, 0.0]),
bare_vec("also_in_a", vec![0.5, 0.5, 0.0]),
],
)
.await
.unwrap();
let resp = engine
.search(&ns_b.to_string(), &cosine_query(vec![1.0, 0.0, 0.0], 10))
.await
.unwrap();
assert!(
resp.results.is_empty(),
"ns_b should be empty but got {} results",
resp.results.len()
);
assert_eq!(storage.count(&ns_a.to_string()).await.unwrap(), 2);
assert_eq!(storage.count(&ns_b.to_string()).await.unwrap(), 0);
}
#[tokio::test]
async fn ann_index_invalidation_does_not_break_subsequent_search() {
let ns = "int_ns_ann_invalidate";
let (engine, storage) = make_engine(ns).await;
storage
.upsert(
&ns.to_string(),
vec![
bare_vec("a", vec![1.0, 0.0, 0.0]),
bare_vec("b", vec![0.0, 1.0, 0.0]),
],
)
.await
.unwrap();
let resp1 = engine
.search(&ns.to_string(), &cosine_query(vec![1.0, 0.0, 0.0], 1))
.await
.unwrap();
assert_eq!(resp1.results[0].id, "a");
engine.invalidate_ann_index(&ns.to_string());
storage
.upsert(
&ns.to_string(),
vec![bare_vec("a_prime", vec![0.99, 0.01, 0.0])],
)
.await
.unwrap();
let resp2 = engine
.search(&ns.to_string(), &cosine_query(vec![1.0, 0.0, 0.0], 3))
.await
.unwrap();
assert_eq!(resp2.results.len(), 3, "all 3 vectors should be returned");
let ids: Vec<&str> = resp2.results.iter().map(|r| r.id.as_str()).collect();
assert!(ids.contains(&"a"), "a must be in results");
assert!(
ids.contains(&"a_prime"),
"a_prime must appear after invalidation"
);
}