1use std::collections::HashSet;
4use std::sync::OnceLock;
5
6use async_trait::async_trait;
7use uuid::Uuid;
8
9use khive_types::SubstrateKind;
10
11use crate::capability::StorageCapability;
12use crate::error::StorageError;
13use crate::types::{
14 BatchWriteSummary, IndexRebuildScope, OrphanSweepConfig, OrphanSweepResult, StorageResult,
15 VectorIndexKind, VectorMetadataFilter, VectorRecord, VectorSearchHit, VectorSearchRequest,
16 VectorStoreCapabilities, VectorStoreInfo,
17};
18
19#[async_trait]
20pub trait VectorStore: Send + Sync + 'static {
21 async fn insert(
24 &self,
25 subject_id: Uuid,
26 kind: SubstrateKind,
27 namespace: &str,
28 field: &str,
29 vectors: Vec<Vec<f32>>,
30 ) -> StorageResult<()>;
31 async fn insert_batch(&self, records: Vec<VectorRecord>) -> StorageResult<BatchWriteSummary>;
32 async fn delete(&self, subject_id: Uuid) -> StorageResult<bool>;
33 async fn count(&self) -> StorageResult<u64>;
34 async fn search(&self, request: VectorSearchRequest) -> StorageResult<Vec<VectorSearchHit>>;
35 async fn info(&self) -> StorageResult<VectorStoreInfo>;
36 async fn rebuild(&self, scope: IndexRebuildScope) -> StorageResult<VectorStoreInfo>;
37
38 fn capabilities(&self) -> &'static VectorStoreCapabilities {
47 static BASELINE: OnceLock<VectorStoreCapabilities> = OnceLock::new();
48 BASELINE.get_or_init(|| VectorStoreCapabilities {
49 supports_filter: false,
50 supports_batch_search: false,
51 supports_quantization: false,
52 supports_update: false,
53 supports_orphan_sweep: false,
54 supports_multi_field: false,
55 max_dimensions: Some(8192),
59 index_kinds: vec![VectorIndexKind::SqliteVec],
60 })
61 }
62
63 async fn search_with_filter(
76 &self,
77 request: &VectorSearchRequest,
78 filter: &VectorMetadataFilter,
79 ) -> StorageResult<Vec<VectorSearchHit>> {
80 if filter.is_empty() {
81 return self.search(request.clone()).await;
82 }
83 debug_assert!(
84 !self.capabilities().supports_filter,
85 "backend claims supports_filter=true but did not override search_with_filter"
86 );
87 Err(StorageError::Unsupported {
88 capability: StorageCapability::Vectors,
89 operation: "search_with_filter".into(),
90 message: "filter pushdown not supported; set supports_filter=true only when overriding this method".into(),
91 })
92 }
93
94 async fn search_batch(
100 &self,
101 requests: &[VectorSearchRequest],
102 ) -> StorageResult<Vec<StorageResult<Vec<VectorSearchHit>>>> {
103 let mut out = Vec::with_capacity(requests.len());
104 for req in requests {
105 out.push(self.search(req.clone()).await);
106 }
107 Ok(out)
108 }
109
110 async fn update(
116 &self,
117 subject_id: Uuid,
118 kind: SubstrateKind,
119 namespace: &str,
120 field: &str,
121 vectors: Vec<Vec<f32>>,
122 ) -> StorageResult<()> {
123 self.delete(subject_id).await?;
124 self.insert(subject_id, kind, namespace, field, vectors)
125 .await
126 }
127
128 async fn orphan_sweep(&self, config: &OrphanSweepConfig) -> StorageResult<OrphanSweepResult> {
133 let _ = config;
134 Err(StorageError::Unsupported {
135 capability: StorageCapability::Vectors,
136 operation: "orphan_sweep".into(),
137 message: "this backend does not support orphan sweep".into(),
138 })
139 }
140
141 async fn batch_exists(&self, ids: &[Uuid], namespace: &str) -> StorageResult<HashSet<Uuid>> {
148 let _ = (ids, namespace);
149 Err(StorageError::Unsupported {
150 capability: StorageCapability::Vectors,
151 operation: "batch_exists".into(),
152 message: "this backend does not support batch existence checks".into(),
153 })
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use std::sync::atomic::{AtomicBool, Ordering};
160
161 use uuid::Uuid;
162
163 use khive_types::SubstrateKind;
164
165 use super::*;
166 use crate::error::StorageError;
167 use crate::types::{
168 BatchWriteSummary, IndexRebuildScope, OrphanSweepConfig, VectorIndexKind,
169 VectorMetadataFilter, VectorSearchHit, VectorSearchRequest, VectorStoreInfo,
170 };
171
172 struct TestVectorStore {
175 fail_delete: AtomicBool,
177 fail_insert: AtomicBool,
179 delete_called: AtomicBool,
181 insert_called: AtomicBool,
183 }
184
185 impl TestVectorStore {
186 fn new() -> Self {
187 Self {
188 fail_delete: AtomicBool::new(false),
189 fail_insert: AtomicBool::new(false),
190 delete_called: AtomicBool::new(false),
191 insert_called: AtomicBool::new(false),
192 }
193 }
194
195 fn with_fail_delete() -> Self {
196 let s = Self::new();
197 s.fail_delete.store(true, Ordering::SeqCst);
198 s
199 }
200
201 fn with_fail_insert() -> Self {
202 let s = Self::new();
203 s.fail_insert.store(true, Ordering::SeqCst);
204 s
205 }
206 }
207
208 #[async_trait]
209 impl VectorStore for TestVectorStore {
210 async fn insert(
211 &self,
212 _subject_id: Uuid,
213 _kind: SubstrateKind,
214 _namespace: &str,
215 _field: &str,
216 _vectors: Vec<Vec<f32>>,
217 ) -> StorageResult<()> {
218 self.insert_called.store(true, Ordering::SeqCst);
219 if self.fail_insert.load(Ordering::SeqCst) {
220 return Err(StorageError::InvalidInput {
221 capability: StorageCapability::Vectors,
222 operation: "insert".into(),
223 message: "injected insert failure".into(),
224 });
225 }
226 Ok(())
227 }
228
229 async fn insert_batch(
230 &self,
231 records: Vec<VectorRecord>,
232 ) -> StorageResult<BatchWriteSummary> {
233 Ok(BatchWriteSummary {
234 attempted: records.len() as u64,
235 affected: records.len() as u64,
236 failed: 0,
237 first_error: String::new(),
238 })
239 }
240
241 async fn delete(&self, _subject_id: Uuid) -> StorageResult<bool> {
242 self.delete_called.store(true, Ordering::SeqCst);
243 if self.fail_delete.load(Ordering::SeqCst) {
244 return Err(StorageError::InvalidInput {
245 capability: StorageCapability::Vectors,
246 operation: "delete".into(),
247 message: "injected delete failure".into(),
248 });
249 }
250 Ok(true)
251 }
252
253 async fn count(&self) -> StorageResult<u64> {
254 Ok(0)
255 }
256
257 async fn search(
258 &self,
259 _request: VectorSearchRequest,
260 ) -> StorageResult<Vec<VectorSearchHit>> {
261 Ok(vec![VectorSearchHit {
262 subject_id: Uuid::nil(),
263 score: khive_score::DeterministicScore::from_f64(0.9),
264 rank: 1,
265 }])
266 }
267
268 async fn info(&self) -> StorageResult<VectorStoreInfo> {
269 Ok(VectorStoreInfo {
270 model_name: "test".into(),
271 dimensions: 4,
272 index_kind: VectorIndexKind::SqliteVec,
273 entry_count: 0,
274 needs_rebuild: false,
275 last_rebuild_at: None,
276 })
277 }
278
279 async fn rebuild(&self, _scope: IndexRebuildScope) -> StorageResult<VectorStoreInfo> {
280 self.info().await
281 }
282 }
283
284 #[tokio::test]
287 async fn capabilities_returns_baseline_defaults() {
288 let store = TestVectorStore::new();
289 let caps = store.capabilities();
290 assert!(!caps.supports_filter);
291 assert!(!caps.supports_batch_search);
292 assert!(!caps.supports_quantization);
293 assert!(!caps.supports_update);
294 assert!(!caps.supports_orphan_sweep);
295 assert_eq!(caps.max_dimensions, Some(8192));
297 assert_eq!(caps.index_kinds, vec![VectorIndexKind::SqliteVec]);
298 }
299
300 #[tokio::test]
304 async fn baseline_max_dimensions_is_sqlite_vec_hard_limit() {
305 let store = TestVectorStore::new();
306 let caps = store.capabilities();
307 let max = caps
308 .max_dimensions
309 .expect("baseline must declare a finite dimension limit");
310 assert!(
311 max >= 8192,
312 "baseline max_dimensions ({max}) must be at least 8192 โ SQLITE_VEC_VEC0_MAX_DIMENSIONS"
313 );
314 }
315
316 #[tokio::test]
319 async fn search_with_filter_empty_filter_delegates_to_search() {
320 let store = TestVectorStore::new();
321 let req = VectorSearchRequest {
322 query_vectors: vec![vec![0.1, 0.2, 0.3, 0.4]],
323 top_k: 5,
324 namespace: None,
325 kind: None,
326 embedding_model: None,
327 filter: None,
328 backend_hints: None,
329 };
330 let filter = VectorMetadataFilter::default(); let result = store.search_with_filter(&req, &filter).await;
332 assert!(result.is_ok());
333 let hits = result.unwrap();
334 assert_eq!(hits.len(), 1);
336 }
337
338 #[tokio::test]
339 async fn search_with_filter_non_empty_filter_returns_unsupported() {
340 let store = TestVectorStore::new();
341 let req = VectorSearchRequest {
342 query_vectors: vec![vec![0.1, 0.2, 0.3, 0.4]],
343 top_k: 5,
344 namespace: None,
345 kind: None,
346 embedding_model: None,
347 filter: None,
348 backend_hints: None,
349 };
350 let filter = VectorMetadataFilter {
351 namespaces: vec!["ns:agent".into()],
352 kinds: vec![],
353 property_filters: vec![],
354 };
355 let result = store.search_with_filter(&req, &filter).await;
356 assert!(result.is_err());
357 let err = result.unwrap_err();
358 assert!(
359 matches!(err, StorageError::Unsupported { .. }),
360 "expected Unsupported, got {err:?}"
361 );
362 }
363
364 #[tokio::test]
365 async fn search_batch_returns_one_result_per_request() {
366 let store = TestVectorStore::new();
367 let requests = vec![
368 VectorSearchRequest {
369 query_vectors: vec![vec![0.1, 0.2, 0.3, 0.4]],
370 top_k: 3,
371 namespace: None,
372 kind: None,
373 embedding_model: None,
374 filter: None,
375 backend_hints: None,
376 },
377 VectorSearchRequest {
378 query_vectors: vec![vec![0.5, 0.6, 0.7, 0.8]],
379 top_k: 3,
380 namespace: None,
381 kind: None,
382 embedding_model: None,
383 filter: None,
384 backend_hints: None,
385 },
386 ];
387 let result = store.search_batch(&requests).await;
388 assert!(result.is_ok());
389 let batched = result.unwrap();
390 assert_eq!(batched.len(), 2, "should return one result set per request");
391 for inner in &batched {
392 assert!(inner.is_ok(), "each inner result should be Ok");
393 assert_eq!(
394 inner.as_ref().unwrap().len(),
395 1,
396 "each Ok should have one hit"
397 );
398 }
399 }
400
401 #[tokio::test]
402 async fn search_batch_isolates_per_query_errors() {
403 struct FailingSearch;
406
407 #[async_trait]
408 impl VectorStore for FailingSearch {
409 async fn insert(
410 &self,
411 _: Uuid,
412 _: SubstrateKind,
413 _: &str,
414 _: &str,
415 _: Vec<Vec<f32>>,
416 ) -> StorageResult<()> {
417 Ok(())
418 }
419 async fn insert_batch(&self, _: Vec<VectorRecord>) -> StorageResult<BatchWriteSummary> {
420 Ok(BatchWriteSummary::default())
421 }
422 async fn delete(&self, _: Uuid) -> StorageResult<bool> {
423 Ok(false)
424 }
425 async fn count(&self) -> StorageResult<u64> {
426 Ok(0)
427 }
428 async fn search(&self, _: VectorSearchRequest) -> StorageResult<Vec<VectorSearchHit>> {
429 Err(StorageError::InvalidInput {
430 capability: StorageCapability::Vectors,
431 operation: "search".into(),
432 message: "injected search failure".into(),
433 })
434 }
435 async fn info(&self) -> StorageResult<VectorStoreInfo> {
436 Ok(VectorStoreInfo {
437 model_name: "fail".into(),
438 dimensions: 4,
439 index_kind: VectorIndexKind::SqliteVec,
440 entry_count: 0,
441 needs_rebuild: false,
442 last_rebuild_at: None,
443 })
444 }
445 async fn rebuild(&self, _: IndexRebuildScope) -> StorageResult<VectorStoreInfo> {
446 self.info().await
447 }
448 }
449
450 let store = FailingSearch;
451 let requests = vec![VectorSearchRequest {
452 query_vectors: vec![vec![0.1]],
453 top_k: 1,
454 namespace: None,
455 kind: None,
456 embedding_model: None,
457 filter: None,
458 backend_hints: None,
459 }];
460 let result = store.search_batch(&requests).await;
462 assert!(result.is_ok(), "outer result must be Ok for batch");
463 let batched = result.unwrap();
464 assert_eq!(batched.len(), 1);
465 assert!(batched[0].is_err(), "inner result must carry the error");
466 }
467
468 #[tokio::test]
469 async fn orphan_sweep_default_returns_unsupported() {
470 let store = TestVectorStore::new();
471 let config = OrphanSweepConfig {
472 subject_id_allowlist: None,
473 namespaces: vec![],
474 substrate_kinds: vec![],
475 max_delete: 100,
476 dry_run: true,
477 };
478 let result = store.orphan_sweep(&config).await;
479 assert!(
480 matches!(result, Err(StorageError::Unsupported { .. })),
481 "expected Unsupported, got {result:?}"
482 );
483 assert!(!store.capabilities().supports_orphan_sweep);
484 }
485
486 #[tokio::test]
487 async fn update_calls_delete_then_insert() {
488 let store = TestVectorStore::new();
489 let id = Uuid::new_v4();
490 let result = store
491 .update(
492 id,
493 SubstrateKind::Entity,
494 "ns:test",
495 "body",
496 vec![vec![0.1, 0.2]],
497 )
498 .await;
499 assert!(result.is_ok());
500 assert!(
501 store.delete_called.load(Ordering::SeqCst),
502 "delete must be called"
503 );
504 assert!(
505 store.insert_called.load(Ordering::SeqCst),
506 "insert must be called after delete"
507 );
508 }
509
510 #[tokio::test]
511 async fn update_propagates_delete_failure() {
512 let store = TestVectorStore::with_fail_delete();
513 let id = Uuid::new_v4();
514 let result = store
515 .update(
516 id,
517 SubstrateKind::Entity,
518 "ns:test",
519 "body",
520 vec![vec![0.1, 0.2]],
521 )
522 .await;
523 assert!(result.is_err());
524 assert!(
525 store.delete_called.load(Ordering::SeqCst),
526 "delete must be attempted"
527 );
528 assert!(
529 !store.insert_called.load(Ordering::SeqCst),
530 "insert must NOT be called when delete fails"
531 );
532 }
533
534 #[tokio::test]
535 async fn update_propagates_insert_failure() {
536 let store = TestVectorStore::with_fail_insert();
537 let id = Uuid::new_v4();
538 let result = store
539 .update(
540 id,
541 SubstrateKind::Entity,
542 "ns:test",
543 "body",
544 vec![vec![0.1, 0.2]],
545 )
546 .await;
547 assert!(result.is_err());
548 assert!(
549 store.insert_called.load(Ordering::SeqCst),
550 "insert must be attempted"
551 );
552 }
553
554 #[tokio::test]
555 async fn vector_metadata_filter_is_empty_with_property_filters() {
556 let empty = VectorMetadataFilter::default();
557 assert!(empty.is_empty());
558
559 let with_ns = VectorMetadataFilter {
560 namespaces: vec!["ns".into()],
561 ..Default::default()
562 };
563 assert!(!with_ns.is_empty());
564
565 use crate::types::{PropertyFilter, PropertyOp};
566 let with_prop = VectorMetadataFilter {
567 property_filters: vec![PropertyFilter {
568 key: "k".into(),
569 op: PropertyOp::Eq,
570 value: serde_json::Value::Bool(true),
571 }],
572 ..Default::default()
573 };
574 assert!(!with_prop.is_empty());
575 }
576}