Skip to main content

khive_storage/
vectors.rs

1//! Vector embedding storage and similarity search capability (ADR-024, ADR-041).
2
3use std::collections::HashSet;
4use std::sync::OnceLock;
5
6use async_trait::async_trait;
7use uuid::Uuid;
8
9use khive_types::SubstrateKind;
10
11use crate::capability::StorageCapability;
12use crate::error::StorageError;
13use crate::types::{
14    BatchWriteSummary, IndexRebuildScope, OrphanSweepConfig, OrphanSweepResult, StorageResult,
15    VectorIndexKind, VectorMetadataFilter, VectorRecord, VectorSearchHit, VectorSearchRequest,
16    VectorStoreCapabilities, VectorStoreInfo,
17};
18
19#[async_trait]
20pub trait VectorStore: Send + Sync + 'static {
21    // --- Existing methods (unchanged) ---
22
23    async fn insert(
24        &self,
25        subject_id: Uuid,
26        kind: SubstrateKind,
27        namespace: &str,
28        field: &str,
29        vectors: Vec<Vec<f32>>,
30    ) -> StorageResult<()>;
31    async fn insert_batch(&self, records: Vec<VectorRecord>) -> StorageResult<BatchWriteSummary>;
32    async fn delete(&self, subject_id: Uuid) -> StorageResult<bool>;
33    async fn count(&self) -> StorageResult<u64>;
34    async fn search(&self, request: VectorSearchRequest) -> StorageResult<Vec<VectorSearchHit>>;
35    async fn info(&self) -> StorageResult<VectorStoreInfo>;
36    async fn rebuild(&self, scope: IndexRebuildScope) -> StorageResult<VectorStoreInfo>;
37
38    // --- New methods (default impls; backends opt in by overriding) ---
39
40    /// Declare what this backend supports (called at runtime policy construction).
41    ///
42    /// Default returns a conservative baseline with all optional features disabled,
43    /// preserving backward compatibility for existing implementations. Backends that
44    /// support filter pushdown, batch search, quantization, or in-place update should
45    /// override this and return their own `&'static VectorStoreCapabilities`.
46    fn capabilities(&self) -> &'static VectorStoreCapabilities {
47        static BASELINE: OnceLock<VectorStoreCapabilities> = OnceLock::new();
48        BASELINE.get_or_init(|| VectorStoreCapabilities {
49            supports_filter: false,
50            supports_batch_search: false,
51            supports_quantization: false,
52            supports_update: false,
53            supports_orphan_sweep: false,
54            supports_multi_field: false,
55            // sqlite-vec 0.1.9 enforces SQLITE_VEC_VEC0_MAX_DIMENSIONS = 8192.
56            // The baseline uses the same value so generic callers that have not
57            // overridden capabilities() report the correct ceiling.
58            max_dimensions: Some(8192),
59            index_kinds: vec![VectorIndexKind::SqliteVec],
60        })
61    }
62
63    /// Search with metadata pre-filter.
64    ///
65    /// Default: delegates to [`search`] when the filter carries no predicates;
66    /// returns [`StorageError::Unsupported`] otherwise. Backends with native filter
67    /// pushdown should override this method and set `supports_filter = true` in their
68    /// [`VectorStoreCapabilities`].
69    ///
70    /// Callers must check `capabilities().supports_filter` before calling; the
71    /// runtime layer is responsible for post-filtering when native pushdown is absent.
72    ///
73    /// A backend that claims `supports_filter = true` but does not override this
74    /// method will trigger a `debug_assert` at runtime (ADR-044 ยง4).
75    async fn search_with_filter(
76        &self,
77        request: &VectorSearchRequest,
78        filter: &VectorMetadataFilter,
79    ) -> StorageResult<Vec<VectorSearchHit>> {
80        if filter.is_empty() {
81            return self.search(request.clone()).await;
82        }
83        debug_assert!(
84            !self.capabilities().supports_filter,
85            "backend claims supports_filter=true but did not override search_with_filter"
86        );
87        Err(StorageError::Unsupported {
88            capability: StorageCapability::Vectors,
89            operation: "search_with_filter".into(),
90            message: "filter pushdown not supported; set supports_filter=true only when overriding this method".into(),
91        })
92    }
93
94    /// Search with N query vectors in one round-trip (HyDE fan-out, multi-query).
95    ///
96    /// Default: sequential calls to [`search`], isolating per-query errors so one
97    /// bad request does not abort the batch. Backends that support native batch
98    /// search should override this and set `supports_batch_search = true`.
99    async fn search_batch(
100        &self,
101        requests: &[VectorSearchRequest],
102    ) -> StorageResult<Vec<StorageResult<Vec<VectorSearchHit>>>> {
103        let mut out = Vec::with_capacity(requests.len());
104        for req in requests {
105            out.push(self.search(req.clone()).await);
106        }
107        Ok(out)
108    }
109
110    /// Re-embed an existing entry in place.
111    ///
112    /// Default: delete then insert. Backends that support atomic in-place update
113    /// should override this and set `supports_update = true` in their
114    /// [`VectorStoreCapabilities`].
115    async fn update(
116        &self,
117        subject_id: Uuid,
118        kind: SubstrateKind,
119        namespace: &str,
120        field: &str,
121        vectors: Vec<Vec<f32>>,
122    ) -> StorageResult<()> {
123        self.delete(subject_id).await?;
124        self.insert(subject_id, kind, namespace, field, vectors)
125            .await
126    }
127
128    /// Remove vectors with no live subject (orphan sweep, ADR-044).
129    ///
130    /// Default returns [`StorageError::Unsupported`]. Backends that implement
131    /// deletion must set `supports_orphan_sweep = true` and override this method.
132    async fn orphan_sweep(&self, config: &OrphanSweepConfig) -> StorageResult<OrphanSweepResult> {
133        let _ = config;
134        Err(StorageError::Unsupported {
135            capability: StorageCapability::Vectors,
136            operation: "orphan_sweep".into(),
137            message: "this backend does not support orphan sweep".into(),
138        })
139    }
140
141    /// Check which of the given subject IDs already have embeddings in this store
142    /// for the specified namespace.
143    ///
144    /// Returns a [`HashSet`] of IDs that are present. IDs not in the returned set
145    /// have no embedding. Default returns [`StorageError::Unsupported`]; backends
146    /// that support fast bulk existence checks should override this method.
147    async fn batch_exists(&self, ids: &[Uuid], namespace: &str) -> StorageResult<HashSet<Uuid>> {
148        let _ = (ids, namespace);
149        Err(StorageError::Unsupported {
150            capability: StorageCapability::Vectors,
151            operation: "batch_exists".into(),
152            message: "this backend does not support batch existence checks".into(),
153        })
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use std::sync::atomic::{AtomicBool, Ordering};
160
161    use uuid::Uuid;
162
163    use khive_types::SubstrateKind;
164
165    use super::*;
166    use crate::error::StorageError;
167    use crate::types::{
168        BatchWriteSummary, IndexRebuildScope, OrphanSweepConfig, VectorIndexKind,
169        VectorMetadataFilter, VectorSearchHit, VectorSearchRequest, VectorStoreInfo,
170    };
171
172    // -- Minimal test fake --
173
174    struct TestVectorStore {
175        /// When `true`, `delete` returns an error.
176        fail_delete: AtomicBool,
177        /// When `true`, `insert` returns an error.
178        fail_insert: AtomicBool,
179        /// Tracks whether `delete` was called (set by the last `delete` call).
180        delete_called: AtomicBool,
181        /// Tracks whether `insert` was called (set by the last `insert` call).
182        insert_called: AtomicBool,
183    }
184
185    impl TestVectorStore {
186        fn new() -> Self {
187            Self {
188                fail_delete: AtomicBool::new(false),
189                fail_insert: AtomicBool::new(false),
190                delete_called: AtomicBool::new(false),
191                insert_called: AtomicBool::new(false),
192            }
193        }
194
195        fn with_fail_delete() -> Self {
196            let s = Self::new();
197            s.fail_delete.store(true, Ordering::SeqCst);
198            s
199        }
200
201        fn with_fail_insert() -> Self {
202            let s = Self::new();
203            s.fail_insert.store(true, Ordering::SeqCst);
204            s
205        }
206    }
207
208    #[async_trait]
209    impl VectorStore for TestVectorStore {
210        async fn insert(
211            &self,
212            _subject_id: Uuid,
213            _kind: SubstrateKind,
214            _namespace: &str,
215            _field: &str,
216            _vectors: Vec<Vec<f32>>,
217        ) -> StorageResult<()> {
218            self.insert_called.store(true, Ordering::SeqCst);
219            if self.fail_insert.load(Ordering::SeqCst) {
220                return Err(StorageError::InvalidInput {
221                    capability: StorageCapability::Vectors,
222                    operation: "insert".into(),
223                    message: "injected insert failure".into(),
224                });
225            }
226            Ok(())
227        }
228
229        async fn insert_batch(
230            &self,
231            records: Vec<VectorRecord>,
232        ) -> StorageResult<BatchWriteSummary> {
233            Ok(BatchWriteSummary {
234                attempted: records.len() as u64,
235                affected: records.len() as u64,
236                failed: 0,
237                first_error: String::new(),
238            })
239        }
240
241        async fn delete(&self, _subject_id: Uuid) -> StorageResult<bool> {
242            self.delete_called.store(true, Ordering::SeqCst);
243            if self.fail_delete.load(Ordering::SeqCst) {
244                return Err(StorageError::InvalidInput {
245                    capability: StorageCapability::Vectors,
246                    operation: "delete".into(),
247                    message: "injected delete failure".into(),
248                });
249            }
250            Ok(true)
251        }
252
253        async fn count(&self) -> StorageResult<u64> {
254            Ok(0)
255        }
256
257        async fn search(
258            &self,
259            _request: VectorSearchRequest,
260        ) -> StorageResult<Vec<VectorSearchHit>> {
261            Ok(vec![VectorSearchHit {
262                subject_id: Uuid::nil(),
263                score: khive_score::DeterministicScore::from_f64(0.9),
264                rank: 1,
265            }])
266        }
267
268        async fn info(&self) -> StorageResult<VectorStoreInfo> {
269            Ok(VectorStoreInfo {
270                model_name: "test".into(),
271                dimensions: 4,
272                index_kind: VectorIndexKind::SqliteVec,
273                entry_count: 0,
274                needs_rebuild: false,
275                last_rebuild_at: None,
276            })
277        }
278
279        async fn rebuild(&self, _scope: IndexRebuildScope) -> StorageResult<VectorStoreInfo> {
280            self.info().await
281        }
282    }
283
284    // -- Test cases --
285
286    #[tokio::test]
287    async fn capabilities_returns_baseline_defaults() {
288        let store = TestVectorStore::new();
289        let caps = store.capabilities();
290        assert!(!caps.supports_filter);
291        assert!(!caps.supports_batch_search);
292        assert!(!caps.supports_quantization);
293        assert!(!caps.supports_update);
294        assert!(!caps.supports_orphan_sweep);
295        // Baseline reports the sqlite-vec hard limit (SQLITE_VEC_VEC0_MAX_DIMENSIONS = 8192).
296        assert_eq!(caps.max_dimensions, Some(8192));
297        assert_eq!(caps.index_kinds, vec![VectorIndexKind::SqliteVec]);
298    }
299
300    /// Regression: baseline max_dimensions must be 8192 (SQLITE_VEC_VEC0_MAX_DIMENSIONS),
301    /// not 4096 (SQLITE_VEC_VEC0_K_MAX). Callers with 5000-dim embeddings must not be
302    /// falsely told the default backend is incapable.
303    #[tokio::test]
304    async fn baseline_max_dimensions_is_sqlite_vec_hard_limit() {
305        let store = TestVectorStore::new();
306        let caps = store.capabilities();
307        let max = caps
308            .max_dimensions
309            .expect("baseline must declare a finite dimension limit");
310        assert!(
311            max >= 8192,
312            "baseline max_dimensions ({max}) must be at least 8192 โ€” SQLITE_VEC_VEC0_MAX_DIMENSIONS"
313        );
314    }
315
316    // -- Test cases --
317
318    #[tokio::test]
319    async fn search_with_filter_empty_filter_delegates_to_search() {
320        let store = TestVectorStore::new();
321        let req = VectorSearchRequest {
322            query_vectors: vec![vec![0.1, 0.2, 0.3, 0.4]],
323            top_k: 5,
324            namespace: None,
325            kind: None,
326            embedding_model: None,
327            filter: None,
328            backend_hints: None,
329        };
330        let filter = VectorMetadataFilter::default(); // all fields empty
331        let result = store.search_with_filter(&req, &filter).await;
332        assert!(result.is_ok());
333        let hits = result.unwrap();
334        // search() on TestVectorStore returns exactly one hit
335        assert_eq!(hits.len(), 1);
336    }
337
338    #[tokio::test]
339    async fn search_with_filter_non_empty_filter_returns_unsupported() {
340        let store = TestVectorStore::new();
341        let req = VectorSearchRequest {
342            query_vectors: vec![vec![0.1, 0.2, 0.3, 0.4]],
343            top_k: 5,
344            namespace: None,
345            kind: None,
346            embedding_model: None,
347            filter: None,
348            backend_hints: None,
349        };
350        let filter = VectorMetadataFilter {
351            namespaces: vec!["ns:agent".into()],
352            kinds: vec![],
353            property_filters: vec![],
354        };
355        let result = store.search_with_filter(&req, &filter).await;
356        assert!(result.is_err());
357        let err = result.unwrap_err();
358        assert!(
359            matches!(err, StorageError::Unsupported { .. }),
360            "expected Unsupported, got {err:?}"
361        );
362    }
363
364    #[tokio::test]
365    async fn search_batch_returns_one_result_per_request() {
366        let store = TestVectorStore::new();
367        let requests = vec![
368            VectorSearchRequest {
369                query_vectors: vec![vec![0.1, 0.2, 0.3, 0.4]],
370                top_k: 3,
371                namespace: None,
372                kind: None,
373                embedding_model: None,
374                filter: None,
375                backend_hints: None,
376            },
377            VectorSearchRequest {
378                query_vectors: vec![vec![0.5, 0.6, 0.7, 0.8]],
379                top_k: 3,
380                namespace: None,
381                kind: None,
382                embedding_model: None,
383                filter: None,
384                backend_hints: None,
385            },
386        ];
387        let result = store.search_batch(&requests).await;
388        assert!(result.is_ok());
389        let batched = result.unwrap();
390        assert_eq!(batched.len(), 2, "should return one result set per request");
391        for inner in &batched {
392            assert!(inner.is_ok(), "each inner result should be Ok");
393            assert_eq!(
394                inner.as_ref().unwrap().len(),
395                1,
396                "each Ok should have one hit"
397            );
398        }
399    }
400
401    #[tokio::test]
402    async fn search_batch_isolates_per_query_errors() {
403        // A store that always fails search โ€” the outer Ok must still be returned,
404        // and the failed inner result must carry the error.
405        struct FailingSearch;
406
407        #[async_trait]
408        impl VectorStore for FailingSearch {
409            async fn insert(
410                &self,
411                _: Uuid,
412                _: SubstrateKind,
413                _: &str,
414                _: &str,
415                _: Vec<Vec<f32>>,
416            ) -> StorageResult<()> {
417                Ok(())
418            }
419            async fn insert_batch(&self, _: Vec<VectorRecord>) -> StorageResult<BatchWriteSummary> {
420                Ok(BatchWriteSummary::default())
421            }
422            async fn delete(&self, _: Uuid) -> StorageResult<bool> {
423                Ok(false)
424            }
425            async fn count(&self) -> StorageResult<u64> {
426                Ok(0)
427            }
428            async fn search(&self, _: VectorSearchRequest) -> StorageResult<Vec<VectorSearchHit>> {
429                Err(StorageError::InvalidInput {
430                    capability: StorageCapability::Vectors,
431                    operation: "search".into(),
432                    message: "injected search failure".into(),
433                })
434            }
435            async fn info(&self) -> StorageResult<VectorStoreInfo> {
436                Ok(VectorStoreInfo {
437                    model_name: "fail".into(),
438                    dimensions: 4,
439                    index_kind: VectorIndexKind::SqliteVec,
440                    entry_count: 0,
441                    needs_rebuild: false,
442                    last_rebuild_at: None,
443                })
444            }
445            async fn rebuild(&self, _: IndexRebuildScope) -> StorageResult<VectorStoreInfo> {
446                self.info().await
447            }
448        }
449
450        let store = FailingSearch;
451        let requests = vec![VectorSearchRequest {
452            query_vectors: vec![vec![0.1]],
453            top_k: 1,
454            namespace: None,
455            kind: None,
456            embedding_model: None,
457            filter: None,
458            backend_hints: None,
459        }];
460        // Outer result is Ok; the error is in the inner vec.
461        let result = store.search_batch(&requests).await;
462        assert!(result.is_ok(), "outer result must be Ok for batch");
463        let batched = result.unwrap();
464        assert_eq!(batched.len(), 1);
465        assert!(batched[0].is_err(), "inner result must carry the error");
466    }
467
468    #[tokio::test]
469    async fn orphan_sweep_default_returns_unsupported() {
470        let store = TestVectorStore::new();
471        let config = OrphanSweepConfig {
472            subject_id_allowlist: None,
473            namespaces: vec![],
474            substrate_kinds: vec![],
475            max_delete: 100,
476            dry_run: true,
477        };
478        let result = store.orphan_sweep(&config).await;
479        assert!(
480            matches!(result, Err(StorageError::Unsupported { .. })),
481            "expected Unsupported, got {result:?}"
482        );
483        assert!(!store.capabilities().supports_orphan_sweep);
484    }
485
486    #[tokio::test]
487    async fn update_calls_delete_then_insert() {
488        let store = TestVectorStore::new();
489        let id = Uuid::new_v4();
490        let result = store
491            .update(
492                id,
493                SubstrateKind::Entity,
494                "ns:test",
495                "body",
496                vec![vec![0.1, 0.2]],
497            )
498            .await;
499        assert!(result.is_ok());
500        assert!(
501            store.delete_called.load(Ordering::SeqCst),
502            "delete must be called"
503        );
504        assert!(
505            store.insert_called.load(Ordering::SeqCst),
506            "insert must be called after delete"
507        );
508    }
509
510    #[tokio::test]
511    async fn update_propagates_delete_failure() {
512        let store = TestVectorStore::with_fail_delete();
513        let id = Uuid::new_v4();
514        let result = store
515            .update(
516                id,
517                SubstrateKind::Entity,
518                "ns:test",
519                "body",
520                vec![vec![0.1, 0.2]],
521            )
522            .await;
523        assert!(result.is_err());
524        assert!(
525            store.delete_called.load(Ordering::SeqCst),
526            "delete must be attempted"
527        );
528        assert!(
529            !store.insert_called.load(Ordering::SeqCst),
530            "insert must NOT be called when delete fails"
531        );
532    }
533
534    #[tokio::test]
535    async fn update_propagates_insert_failure() {
536        let store = TestVectorStore::with_fail_insert();
537        let id = Uuid::new_v4();
538        let result = store
539            .update(
540                id,
541                SubstrateKind::Entity,
542                "ns:test",
543                "body",
544                vec![vec![0.1, 0.2]],
545            )
546            .await;
547        assert!(result.is_err());
548        assert!(
549            store.insert_called.load(Ordering::SeqCst),
550            "insert must be attempted"
551        );
552    }
553
554    #[tokio::test]
555    async fn vector_metadata_filter_is_empty_with_property_filters() {
556        let empty = VectorMetadataFilter::default();
557        assert!(empty.is_empty());
558
559        let with_ns = VectorMetadataFilter {
560            namespaces: vec!["ns".into()],
561            ..Default::default()
562        };
563        assert!(!with_ns.is_empty());
564
565        use crate::types::{PropertyFilter, PropertyOp};
566        let with_prop = VectorMetadataFilter {
567            property_filters: vec![PropertyFilter {
568                key: "k".into(),
569                op: PropertyOp::Eq,
570                value: serde_json::Value::Bool(true),
571            }],
572            ..Default::default()
573        };
574        assert!(!with_prop.is_empty());
575    }
576}