1use std::sync::OnceLock;
4
5use async_trait::async_trait;
6use uuid::Uuid;
7
8use khive_types::SubstrateKind;
9
10use crate::capability::StorageCapability;
11use crate::error::StorageError;
12use crate::types::{
13 BatchWriteSummary, IndexRebuildScope, StorageResult, VectorIndexKind, VectorMetadataFilter,
14 VectorRecord, VectorSearchHit, VectorSearchRequest, VectorStoreCapabilities, VectorStoreInfo,
15};
16
17#[async_trait]
18pub trait VectorStore: Send + Sync + 'static {
19 async fn insert(
22 &self,
23 subject_id: Uuid,
24 kind: SubstrateKind,
25 namespace: &str,
26 embedding: Vec<f32>,
27 ) -> StorageResult<()>;
28 async fn insert_batch(&self, records: Vec<VectorRecord>) -> StorageResult<BatchWriteSummary>;
29 async fn delete(&self, subject_id: Uuid) -> StorageResult<bool>;
30 async fn count(&self) -> StorageResult<u64>;
31 async fn search(&self, request: VectorSearchRequest) -> StorageResult<Vec<VectorSearchHit>>;
32 async fn info(&self) -> StorageResult<VectorStoreInfo>;
33 async fn rebuild(&self, scope: IndexRebuildScope) -> StorageResult<VectorStoreInfo>;
34
35 fn capabilities(&self) -> &'static VectorStoreCapabilities {
44 static BASELINE: OnceLock<VectorStoreCapabilities> = OnceLock::new();
45 BASELINE.get_or_init(|| VectorStoreCapabilities {
46 supports_filter: false,
47 supports_batch_search: false,
48 supports_quantization: false,
49 supports_update: false,
50 max_dimensions: Some(8192),
54 index_kinds: vec![VectorIndexKind::SqliteVec],
55 })
56 }
57
58 async fn search_with_filter(
68 &self,
69 request: VectorSearchRequest,
70 filter: VectorMetadataFilter,
71 ) -> StorageResult<Vec<VectorSearchHit>> {
72 if filter.is_empty() {
73 return self.search(request).await;
74 }
75 Err(StorageError::Unsupported {
76 capability: StorageCapability::Vectors,
77 operation: "search_with_filter".into(),
78 message: "filter pushdown not supported by this backend".into(),
79 })
80 }
81
82 async fn search_batch(
88 &self,
89 requests: Vec<VectorSearchRequest>,
90 ) -> StorageResult<Vec<Vec<VectorSearchHit>>> {
91 let mut out = Vec::with_capacity(requests.len());
92 for req in requests {
93 out.push(self.search(req).await?);
94 }
95 Ok(out)
96 }
97
98 async fn update(
104 &self,
105 subject_id: Uuid,
106 kind: SubstrateKind,
107 namespace: &str,
108 embedding: Vec<f32>,
109 ) -> StorageResult<()> {
110 self.delete(subject_id).await?;
111 self.insert(subject_id, kind, namespace, embedding).await
112 }
113}
114
115#[cfg(test)]
116mod tests {
117 use std::sync::atomic::{AtomicBool, Ordering};
118
119 use uuid::Uuid;
120
121 use khive_types::SubstrateKind;
122
123 use super::*;
124 use crate::error::StorageError;
125 use crate::types::{
126 BatchWriteSummary, IndexRebuildScope, VectorIndexKind, VectorMetadataFilter,
127 VectorSearchHit, VectorSearchRequest, VectorStoreInfo,
128 };
129
130 struct TestVectorStore {
133 fail_delete: AtomicBool,
135 fail_insert: AtomicBool,
137 delete_called: AtomicBool,
139 insert_called: AtomicBool,
141 }
142
143 impl TestVectorStore {
144 fn new() -> Self {
145 Self {
146 fail_delete: AtomicBool::new(false),
147 fail_insert: AtomicBool::new(false),
148 delete_called: AtomicBool::new(false),
149 insert_called: AtomicBool::new(false),
150 }
151 }
152
153 fn with_fail_delete() -> Self {
154 let s = Self::new();
155 s.fail_delete.store(true, Ordering::SeqCst);
156 s
157 }
158
159 fn with_fail_insert() -> Self {
160 let s = Self::new();
161 s.fail_insert.store(true, Ordering::SeqCst);
162 s
163 }
164 }
165
166 #[async_trait]
167 impl VectorStore for TestVectorStore {
168 async fn insert(
169 &self,
170 _subject_id: Uuid,
171 _kind: SubstrateKind,
172 _namespace: &str,
173 _embedding: Vec<f32>,
174 ) -> StorageResult<()> {
175 self.insert_called.store(true, Ordering::SeqCst);
176 if self.fail_insert.load(Ordering::SeqCst) {
177 return Err(StorageError::InvalidInput {
178 capability: StorageCapability::Vectors,
179 operation: "insert".into(),
180 message: "injected insert failure".into(),
181 });
182 }
183 Ok(())
184 }
185
186 async fn insert_batch(
187 &self,
188 records: Vec<VectorRecord>,
189 ) -> StorageResult<BatchWriteSummary> {
190 Ok(BatchWriteSummary {
191 attempted: records.len() as u64,
192 affected: records.len() as u64,
193 failed: 0,
194 first_error: String::new(),
195 })
196 }
197
198 async fn delete(&self, _subject_id: Uuid) -> StorageResult<bool> {
199 self.delete_called.store(true, Ordering::SeqCst);
200 if self.fail_delete.load(Ordering::SeqCst) {
201 return Err(StorageError::InvalidInput {
202 capability: StorageCapability::Vectors,
203 operation: "delete".into(),
204 message: "injected delete failure".into(),
205 });
206 }
207 Ok(true)
208 }
209
210 async fn count(&self) -> StorageResult<u64> {
211 Ok(0)
212 }
213
214 async fn search(
215 &self,
216 _request: VectorSearchRequest,
217 ) -> StorageResult<Vec<VectorSearchHit>> {
218 Ok(vec![VectorSearchHit {
219 subject_id: Uuid::nil(),
220 score: khive_score::DeterministicScore::from_f64(0.9),
221 rank: 1,
222 }])
223 }
224
225 async fn info(&self) -> StorageResult<VectorStoreInfo> {
226 Ok(VectorStoreInfo {
227 model_name: "test".into(),
228 dimensions: 4,
229 index_kind: VectorIndexKind::SqliteVec,
230 entry_count: 0,
231 needs_rebuild: false,
232 last_rebuild_at: None,
233 })
234 }
235
236 async fn rebuild(&self, _scope: IndexRebuildScope) -> StorageResult<VectorStoreInfo> {
237 self.info().await
238 }
239 }
240
241 #[tokio::test]
244 async fn capabilities_returns_baseline_defaults() {
245 let store = TestVectorStore::new();
246 let caps = store.capabilities();
247 assert!(!caps.supports_filter);
248 assert!(!caps.supports_batch_search);
249 assert!(!caps.supports_quantization);
250 assert!(!caps.supports_update);
251 assert_eq!(caps.max_dimensions, Some(8192));
253 assert_eq!(caps.index_kinds, vec![VectorIndexKind::SqliteVec]);
254 }
255
256 #[tokio::test]
260 async fn baseline_max_dimensions_is_sqlite_vec_hard_limit() {
261 let store = TestVectorStore::new();
262 let caps = store.capabilities();
263 let max = caps
264 .max_dimensions
265 .expect("baseline must declare a finite dimension limit");
266 assert!(
267 max >= 8192,
268 "baseline max_dimensions ({max}) must be at least 8192 — SQLITE_VEC_VEC0_MAX_DIMENSIONS"
269 );
270 }
271
272 #[tokio::test]
273 async fn search_with_filter_empty_filter_delegates_to_search() {
274 let store = TestVectorStore::new();
275 let req = VectorSearchRequest {
276 query_embedding: vec![0.1, 0.2, 0.3, 0.4],
277 top_k: 5,
278 namespace: None,
279 kind: None,
280 };
281 let filter = VectorMetadataFilter::default(); let result = store.search_with_filter(req, filter).await;
283 assert!(result.is_ok());
284 let hits = result.unwrap();
285 assert_eq!(hits.len(), 1);
287 }
288
289 #[tokio::test]
290 async fn search_with_filter_non_empty_filter_returns_unsupported() {
291 let store = TestVectorStore::new();
292 let req = VectorSearchRequest {
293 query_embedding: vec![0.1, 0.2, 0.3, 0.4],
294 top_k: 5,
295 namespace: None,
296 kind: None,
297 };
298 let filter = VectorMetadataFilter {
299 namespaces: vec!["ns:agent".into()],
300 kinds: vec![],
301 properties: vec![],
302 };
303 let result = store.search_with_filter(req, filter).await;
304 assert!(result.is_err());
305 let err = result.unwrap_err();
306 assert!(
307 matches!(err, StorageError::Unsupported { .. }),
308 "expected Unsupported, got {err:?}"
309 );
310 }
311
312 #[tokio::test]
313 async fn search_batch_returns_one_result_per_request() {
314 let store = TestVectorStore::new();
315 let requests = vec![
316 VectorSearchRequest {
317 query_embedding: vec![0.1, 0.2, 0.3, 0.4],
318 top_k: 3,
319 namespace: None,
320 kind: None,
321 },
322 VectorSearchRequest {
323 query_embedding: vec![0.5, 0.6, 0.7, 0.8],
324 top_k: 3,
325 namespace: None,
326 kind: None,
327 },
328 ];
329 let result = store.search_batch(requests).await;
330 assert!(result.is_ok());
331 let batched = result.unwrap();
332 assert_eq!(batched.len(), 2, "should return one result set per request");
333 for hits in &batched {
334 assert_eq!(hits.len(), 1, "each result set should have one hit");
335 }
336 }
337
338 #[tokio::test]
339 async fn search_batch_propagates_search_error() {
340 struct FailingSearch;
343
344 #[async_trait]
345 impl VectorStore for FailingSearch {
346 async fn insert(
347 &self,
348 _: Uuid,
349 _: SubstrateKind,
350 _: &str,
351 _: Vec<f32>,
352 ) -> StorageResult<()> {
353 Ok(())
354 }
355 async fn insert_batch(&self, _: Vec<VectorRecord>) -> StorageResult<BatchWriteSummary> {
356 Ok(BatchWriteSummary::default())
357 }
358 async fn delete(&self, _: Uuid) -> StorageResult<bool> {
359 Ok(false)
360 }
361 async fn count(&self) -> StorageResult<u64> {
362 Ok(0)
363 }
364 async fn search(&self, _: VectorSearchRequest) -> StorageResult<Vec<VectorSearchHit>> {
365 Err(StorageError::InvalidInput {
366 capability: StorageCapability::Vectors,
367 operation: "search".into(),
368 message: "injected search failure".into(),
369 })
370 }
371 async fn info(&self) -> StorageResult<VectorStoreInfo> {
372 Ok(VectorStoreInfo {
373 model_name: "fail".into(),
374 dimensions: 4,
375 index_kind: VectorIndexKind::SqliteVec,
376 entry_count: 0,
377 needs_rebuild: false,
378 last_rebuild_at: None,
379 })
380 }
381 async fn rebuild(&self, _: IndexRebuildScope) -> StorageResult<VectorStoreInfo> {
382 self.info().await
383 }
384 }
385
386 let store = FailingSearch;
387 let requests = vec![VectorSearchRequest {
388 query_embedding: vec![0.1],
389 top_k: 1,
390 namespace: None,
391 kind: None,
392 }];
393 let result = store.search_batch(requests).await;
394 assert!(result.is_err());
395 }
396
397 #[tokio::test]
398 async fn update_calls_delete_then_insert() {
399 let store = TestVectorStore::new();
400 let id = Uuid::new_v4();
401 let result = store
402 .update(id, SubstrateKind::Entity, "ns:test", vec![0.1, 0.2])
403 .await;
404 assert!(result.is_ok());
405 assert!(
406 store.delete_called.load(Ordering::SeqCst),
407 "delete must be called"
408 );
409 assert!(
410 store.insert_called.load(Ordering::SeqCst),
411 "insert must be called after delete"
412 );
413 }
414
415 #[tokio::test]
416 async fn update_propagates_delete_failure() {
417 let store = TestVectorStore::with_fail_delete();
418 let id = Uuid::new_v4();
419 let result = store
420 .update(id, SubstrateKind::Entity, "ns:test", vec![0.1, 0.2])
421 .await;
422 assert!(result.is_err());
423 assert!(
424 store.delete_called.load(Ordering::SeqCst),
425 "delete must be attempted"
426 );
427 assert!(
428 !store.insert_called.load(Ordering::SeqCst),
429 "insert must NOT be called when delete fails"
430 );
431 }
432
433 #[tokio::test]
434 async fn update_propagates_insert_failure() {
435 let store = TestVectorStore::with_fail_insert();
436 let id = Uuid::new_v4();
437 let result = store
438 .update(id, SubstrateKind::Entity, "ns:test", vec![0.1, 0.2])
439 .await;
440 assert!(result.is_err());
441 assert!(
442 store.insert_called.load(Ordering::SeqCst),
443 "insert must be attempted"
444 );
445 }
446}