1use std::sync::OnceLock;
4
5use async_trait::async_trait;
6use uuid::Uuid;
7
8use khive_types::SubstrateKind;
9
10use crate::capability::StorageCapability;
11use crate::error::StorageError;
12use crate::types::{
13 BatchWriteSummary, IndexRebuildScope, OrphanSweepConfig, OrphanSweepResult, StorageResult,
14 VectorIndexKind, VectorMetadataFilter, VectorRecord, VectorSearchHit, VectorSearchRequest,
15 VectorStoreCapabilities, VectorStoreInfo,
16};
17
18#[async_trait]
19pub trait VectorStore: Send + Sync + 'static {
20 async fn insert(
23 &self,
24 subject_id: Uuid,
25 kind: SubstrateKind,
26 namespace: &str,
27 field: &str,
28 vectors: Vec<Vec<f32>>,
29 ) -> StorageResult<()>;
30 async fn insert_batch(&self, records: Vec<VectorRecord>) -> StorageResult<BatchWriteSummary>;
31 async fn delete(&self, subject_id: Uuid) -> StorageResult<bool>;
32 async fn count(&self) -> StorageResult<u64>;
33 async fn search(&self, request: VectorSearchRequest) -> StorageResult<Vec<VectorSearchHit>>;
34 async fn info(&self) -> StorageResult<VectorStoreInfo>;
35 async fn rebuild(&self, scope: IndexRebuildScope) -> StorageResult<VectorStoreInfo>;
36
37 fn capabilities(&self) -> &'static VectorStoreCapabilities {
46 static BASELINE: OnceLock<VectorStoreCapabilities> = OnceLock::new();
47 BASELINE.get_or_init(|| VectorStoreCapabilities {
48 supports_filter: false,
49 supports_batch_search: false,
50 supports_quantization: false,
51 supports_update: false,
52 supports_orphan_sweep: false,
53 supports_multi_field: false,
54 max_dimensions: Some(8192),
58 index_kinds: vec![VectorIndexKind::SqliteVec],
59 })
60 }
61
62 async fn search_with_filter(
75 &self,
76 request: &VectorSearchRequest,
77 filter: &VectorMetadataFilter,
78 ) -> StorageResult<Vec<VectorSearchHit>> {
79 if filter.is_empty() {
80 return self.search(request.clone()).await;
81 }
82 debug_assert!(
83 !self.capabilities().supports_filter,
84 "backend claims supports_filter=true but did not override search_with_filter"
85 );
86 Err(StorageError::Unsupported {
87 capability: StorageCapability::Vectors,
88 operation: "search_with_filter".into(),
89 message: "filter pushdown not supported; set supports_filter=true only when overriding this method".into(),
90 })
91 }
92
93 async fn search_batch(
99 &self,
100 requests: &[VectorSearchRequest],
101 ) -> StorageResult<Vec<StorageResult<Vec<VectorSearchHit>>>> {
102 let mut out = Vec::with_capacity(requests.len());
103 for req in requests {
104 out.push(self.search(req.clone()).await);
105 }
106 Ok(out)
107 }
108
109 async fn update(
115 &self,
116 subject_id: Uuid,
117 kind: SubstrateKind,
118 namespace: &str,
119 field: &str,
120 vectors: Vec<Vec<f32>>,
121 ) -> StorageResult<()> {
122 self.delete(subject_id).await?;
123 self.insert(subject_id, kind, namespace, field, vectors)
124 .await
125 }
126
127 async fn orphan_sweep(&self, config: &OrphanSweepConfig) -> StorageResult<OrphanSweepResult> {
132 let _ = config;
133 Err(StorageError::Unsupported {
134 capability: StorageCapability::Vectors,
135 operation: "orphan_sweep".into(),
136 message: "this backend does not support orphan sweep".into(),
137 })
138 }
139}
140
141#[cfg(test)]
142mod tests {
143 use std::sync::atomic::{AtomicBool, Ordering};
144
145 use uuid::Uuid;
146
147 use khive_types::SubstrateKind;
148
149 use super::*;
150 use crate::error::StorageError;
151 use crate::types::{
152 BatchWriteSummary, IndexRebuildScope, OrphanSweepConfig, VectorIndexKind,
153 VectorMetadataFilter, VectorSearchHit, VectorSearchRequest, VectorStoreInfo,
154 };
155
156 struct TestVectorStore {
159 fail_delete: AtomicBool,
161 fail_insert: AtomicBool,
163 delete_called: AtomicBool,
165 insert_called: AtomicBool,
167 }
168
169 impl TestVectorStore {
170 fn new() -> Self {
171 Self {
172 fail_delete: AtomicBool::new(false),
173 fail_insert: AtomicBool::new(false),
174 delete_called: AtomicBool::new(false),
175 insert_called: AtomicBool::new(false),
176 }
177 }
178
179 fn with_fail_delete() -> Self {
180 let s = Self::new();
181 s.fail_delete.store(true, Ordering::SeqCst);
182 s
183 }
184
185 fn with_fail_insert() -> Self {
186 let s = Self::new();
187 s.fail_insert.store(true, Ordering::SeqCst);
188 s
189 }
190 }
191
192 #[async_trait]
193 impl VectorStore for TestVectorStore {
194 async fn insert(
195 &self,
196 _subject_id: Uuid,
197 _kind: SubstrateKind,
198 _namespace: &str,
199 _field: &str,
200 _vectors: Vec<Vec<f32>>,
201 ) -> StorageResult<()> {
202 self.insert_called.store(true, Ordering::SeqCst);
203 if self.fail_insert.load(Ordering::SeqCst) {
204 return Err(StorageError::InvalidInput {
205 capability: StorageCapability::Vectors,
206 operation: "insert".into(),
207 message: "injected insert failure".into(),
208 });
209 }
210 Ok(())
211 }
212
213 async fn insert_batch(
214 &self,
215 records: Vec<VectorRecord>,
216 ) -> StorageResult<BatchWriteSummary> {
217 Ok(BatchWriteSummary {
218 attempted: records.len() as u64,
219 affected: records.len() as u64,
220 failed: 0,
221 first_error: String::new(),
222 })
223 }
224
225 async fn delete(&self, _subject_id: Uuid) -> StorageResult<bool> {
226 self.delete_called.store(true, Ordering::SeqCst);
227 if self.fail_delete.load(Ordering::SeqCst) {
228 return Err(StorageError::InvalidInput {
229 capability: StorageCapability::Vectors,
230 operation: "delete".into(),
231 message: "injected delete failure".into(),
232 });
233 }
234 Ok(true)
235 }
236
237 async fn count(&self) -> StorageResult<u64> {
238 Ok(0)
239 }
240
241 async fn search(
242 &self,
243 _request: VectorSearchRequest,
244 ) -> StorageResult<Vec<VectorSearchHit>> {
245 Ok(vec![VectorSearchHit {
246 subject_id: Uuid::nil(),
247 score: khive_score::DeterministicScore::from_f64(0.9),
248 rank: 1,
249 }])
250 }
251
252 async fn info(&self) -> StorageResult<VectorStoreInfo> {
253 Ok(VectorStoreInfo {
254 model_name: "test".into(),
255 dimensions: 4,
256 index_kind: VectorIndexKind::SqliteVec,
257 entry_count: 0,
258 needs_rebuild: false,
259 last_rebuild_at: None,
260 })
261 }
262
263 async fn rebuild(&self, _scope: IndexRebuildScope) -> StorageResult<VectorStoreInfo> {
264 self.info().await
265 }
266 }
267
268 #[tokio::test]
271 async fn capabilities_returns_baseline_defaults() {
272 let store = TestVectorStore::new();
273 let caps = store.capabilities();
274 assert!(!caps.supports_filter);
275 assert!(!caps.supports_batch_search);
276 assert!(!caps.supports_quantization);
277 assert!(!caps.supports_update);
278 assert!(!caps.supports_orphan_sweep);
279 assert_eq!(caps.max_dimensions, Some(8192));
281 assert_eq!(caps.index_kinds, vec![VectorIndexKind::SqliteVec]);
282 }
283
284 #[tokio::test]
288 async fn baseline_max_dimensions_is_sqlite_vec_hard_limit() {
289 let store = TestVectorStore::new();
290 let caps = store.capabilities();
291 let max = caps
292 .max_dimensions
293 .expect("baseline must declare a finite dimension limit");
294 assert!(
295 max >= 8192,
296 "baseline max_dimensions ({max}) must be at least 8192 โ SQLITE_VEC_VEC0_MAX_DIMENSIONS"
297 );
298 }
299
300 #[tokio::test]
303 async fn search_with_filter_empty_filter_delegates_to_search() {
304 let store = TestVectorStore::new();
305 let req = VectorSearchRequest {
306 query_vectors: vec![vec![0.1, 0.2, 0.3, 0.4]],
307 top_k: 5,
308 namespace: None,
309 kind: None,
310 filter: None,
311 backend_hints: None,
312 };
313 let filter = VectorMetadataFilter::default(); let result = store.search_with_filter(&req, &filter).await;
315 assert!(result.is_ok());
316 let hits = result.unwrap();
317 assert_eq!(hits.len(), 1);
319 }
320
321 #[tokio::test]
322 async fn search_with_filter_non_empty_filter_returns_unsupported() {
323 let store = TestVectorStore::new();
324 let req = VectorSearchRequest {
325 query_vectors: vec![vec![0.1, 0.2, 0.3, 0.4]],
326 top_k: 5,
327 namespace: None,
328 kind: None,
329 filter: None,
330 backend_hints: None,
331 };
332 let filter = VectorMetadataFilter {
333 namespaces: vec!["ns:agent".into()],
334 kinds: vec![],
335 property_filters: vec![],
336 };
337 let result = store.search_with_filter(&req, &filter).await;
338 assert!(result.is_err());
339 let err = result.unwrap_err();
340 assert!(
341 matches!(err, StorageError::Unsupported { .. }),
342 "expected Unsupported, got {err:?}"
343 );
344 }
345
346 #[tokio::test]
347 async fn search_batch_returns_one_result_per_request() {
348 let store = TestVectorStore::new();
349 let requests = vec![
350 VectorSearchRequest {
351 query_vectors: vec![vec![0.1, 0.2, 0.3, 0.4]],
352 top_k: 3,
353 namespace: None,
354 kind: None,
355 filter: None,
356 backend_hints: None,
357 },
358 VectorSearchRequest {
359 query_vectors: vec![vec![0.5, 0.6, 0.7, 0.8]],
360 top_k: 3,
361 namespace: None,
362 kind: None,
363 filter: None,
364 backend_hints: None,
365 },
366 ];
367 let result = store.search_batch(&requests).await;
368 assert!(result.is_ok());
369 let batched = result.unwrap();
370 assert_eq!(batched.len(), 2, "should return one result set per request");
371 for inner in &batched {
372 assert!(inner.is_ok(), "each inner result should be Ok");
373 assert_eq!(
374 inner.as_ref().unwrap().len(),
375 1,
376 "each Ok should have one hit"
377 );
378 }
379 }
380
381 #[tokio::test]
382 async fn search_batch_isolates_per_query_errors() {
383 struct FailingSearch;
386
387 #[async_trait]
388 impl VectorStore for FailingSearch {
389 async fn insert(
390 &self,
391 _: Uuid,
392 _: SubstrateKind,
393 _: &str,
394 _: &str,
395 _: Vec<Vec<f32>>,
396 ) -> StorageResult<()> {
397 Ok(())
398 }
399 async fn insert_batch(&self, _: Vec<VectorRecord>) -> StorageResult<BatchWriteSummary> {
400 Ok(BatchWriteSummary::default())
401 }
402 async fn delete(&self, _: Uuid) -> StorageResult<bool> {
403 Ok(false)
404 }
405 async fn count(&self) -> StorageResult<u64> {
406 Ok(0)
407 }
408 async fn search(&self, _: VectorSearchRequest) -> StorageResult<Vec<VectorSearchHit>> {
409 Err(StorageError::InvalidInput {
410 capability: StorageCapability::Vectors,
411 operation: "search".into(),
412 message: "injected search failure".into(),
413 })
414 }
415 async fn info(&self) -> StorageResult<VectorStoreInfo> {
416 Ok(VectorStoreInfo {
417 model_name: "fail".into(),
418 dimensions: 4,
419 index_kind: VectorIndexKind::SqliteVec,
420 entry_count: 0,
421 needs_rebuild: false,
422 last_rebuild_at: None,
423 })
424 }
425 async fn rebuild(&self, _: IndexRebuildScope) -> StorageResult<VectorStoreInfo> {
426 self.info().await
427 }
428 }
429
430 let store = FailingSearch;
431 let requests = vec![VectorSearchRequest {
432 query_vectors: vec![vec![0.1]],
433 top_k: 1,
434 namespace: None,
435 kind: None,
436 filter: None,
437 backend_hints: None,
438 }];
439 let result = store.search_batch(&requests).await;
441 assert!(result.is_ok(), "outer result must be Ok for batch");
442 let batched = result.unwrap();
443 assert_eq!(batched.len(), 1);
444 assert!(batched[0].is_err(), "inner result must carry the error");
445 }
446
447 #[tokio::test]
448 async fn orphan_sweep_default_returns_unsupported() {
449 let store = TestVectorStore::new();
450 let config = OrphanSweepConfig {
451 subject_id_allowlist: None,
452 namespaces: vec![],
453 substrate_kinds: vec![],
454 max_delete: 100,
455 dry_run: true,
456 };
457 let result = store.orphan_sweep(&config).await;
458 assert!(
459 matches!(result, Err(StorageError::Unsupported { .. })),
460 "expected Unsupported, got {result:?}"
461 );
462 assert!(!store.capabilities().supports_orphan_sweep);
463 }
464
465 #[tokio::test]
466 async fn update_calls_delete_then_insert() {
467 let store = TestVectorStore::new();
468 let id = Uuid::new_v4();
469 let result = store
470 .update(
471 id,
472 SubstrateKind::Entity,
473 "ns:test",
474 "body",
475 vec![vec![0.1, 0.2]],
476 )
477 .await;
478 assert!(result.is_ok());
479 assert!(
480 store.delete_called.load(Ordering::SeqCst),
481 "delete must be called"
482 );
483 assert!(
484 store.insert_called.load(Ordering::SeqCst),
485 "insert must be called after delete"
486 );
487 }
488
489 #[tokio::test]
490 async fn update_propagates_delete_failure() {
491 let store = TestVectorStore::with_fail_delete();
492 let id = Uuid::new_v4();
493 let result = store
494 .update(
495 id,
496 SubstrateKind::Entity,
497 "ns:test",
498 "body",
499 vec![vec![0.1, 0.2]],
500 )
501 .await;
502 assert!(result.is_err());
503 assert!(
504 store.delete_called.load(Ordering::SeqCst),
505 "delete must be attempted"
506 );
507 assert!(
508 !store.insert_called.load(Ordering::SeqCst),
509 "insert must NOT be called when delete fails"
510 );
511 }
512
513 #[tokio::test]
514 async fn update_propagates_insert_failure() {
515 let store = TestVectorStore::with_fail_insert();
516 let id = Uuid::new_v4();
517 let result = store
518 .update(
519 id,
520 SubstrateKind::Entity,
521 "ns:test",
522 "body",
523 vec![vec![0.1, 0.2]],
524 )
525 .await;
526 assert!(result.is_err());
527 assert!(
528 store.insert_called.load(Ordering::SeqCst),
529 "insert must be attempted"
530 );
531 }
532
533 #[tokio::test]
534 async fn vector_metadata_filter_is_empty_with_property_filters() {
535 let empty = VectorMetadataFilter::default();
536 assert!(empty.is_empty());
537
538 let with_ns = VectorMetadataFilter {
539 namespaces: vec!["ns".into()],
540 ..Default::default()
541 };
542 assert!(!with_ns.is_empty());
543
544 use crate::types::{PropertyFilter, PropertyOp};
545 let with_prop = VectorMetadataFilter {
546 property_filters: vec![PropertyFilter {
547 key: "k".into(),
548 op: PropertyOp::Eq,
549 value: serde_json::Value::Bool(true),
550 }],
551 ..Default::default()
552 };
553 assert!(!with_prop.is_empty());
554 }
555}