Skip to main content

khive_storage/
vectors.rs

1//! Vector embedding storage and similarity search capability (ADR-024, ADR-041).
2
3use std::sync::OnceLock;
4
5use async_trait::async_trait;
6use uuid::Uuid;
7
8use khive_types::SubstrateKind;
9
10use crate::capability::StorageCapability;
11use crate::error::StorageError;
12use crate::types::{
13    BatchWriteSummary, IndexRebuildScope, OrphanSweepConfig, OrphanSweepResult, StorageResult,
14    VectorIndexKind, VectorMetadataFilter, VectorRecord, VectorSearchHit, VectorSearchRequest,
15    VectorStoreCapabilities, VectorStoreInfo,
16};
17
18#[async_trait]
19pub trait VectorStore: Send + Sync + 'static {
20    // --- Existing methods (unchanged) ---
21
22    async fn insert(
23        &self,
24        subject_id: Uuid,
25        kind: SubstrateKind,
26        namespace: &str,
27        field: &str,
28        vectors: Vec<Vec<f32>>,
29    ) -> StorageResult<()>;
30    async fn insert_batch(&self, records: Vec<VectorRecord>) -> StorageResult<BatchWriteSummary>;
31    async fn delete(&self, subject_id: Uuid) -> StorageResult<bool>;
32    async fn count(&self) -> StorageResult<u64>;
33    async fn search(&self, request: VectorSearchRequest) -> StorageResult<Vec<VectorSearchHit>>;
34    async fn info(&self) -> StorageResult<VectorStoreInfo>;
35    async fn rebuild(&self, scope: IndexRebuildScope) -> StorageResult<VectorStoreInfo>;
36
37    // --- New methods (default impls; backends opt in by overriding) ---
38
39    /// Declare what this backend supports (called at runtime policy construction).
40    ///
41    /// Default returns a conservative baseline with all optional features disabled,
42    /// preserving backward compatibility for existing implementations. Backends that
43    /// support filter pushdown, batch search, quantization, or in-place update should
44    /// override this and return their own `&'static VectorStoreCapabilities`.
45    fn capabilities(&self) -> &'static VectorStoreCapabilities {
46        static BASELINE: OnceLock<VectorStoreCapabilities> = OnceLock::new();
47        BASELINE.get_or_init(|| VectorStoreCapabilities {
48            supports_filter: false,
49            supports_batch_search: false,
50            supports_quantization: false,
51            supports_update: false,
52            supports_orphan_sweep: false,
53            supports_multi_field: false,
54            // sqlite-vec 0.1.9 enforces SQLITE_VEC_VEC0_MAX_DIMENSIONS = 8192.
55            // The baseline uses the same value so generic callers that have not
56            // overridden capabilities() report the correct ceiling.
57            max_dimensions: Some(8192),
58            index_kinds: vec![VectorIndexKind::SqliteVec],
59        })
60    }
61
62    /// Search with metadata pre-filter.
63    ///
64    /// Default: delegates to [`search`] when the filter carries no predicates;
65    /// returns [`StorageError::Unsupported`] otherwise. Backends with native filter
66    /// pushdown should override this method and set `supports_filter = true` in their
67    /// [`VectorStoreCapabilities`].
68    ///
69    /// Callers must check `capabilities().supports_filter` before calling; the
70    /// runtime layer is responsible for post-filtering when native pushdown is absent.
71    ///
72    /// A backend that claims `supports_filter = true` but does not override this
73    /// method will trigger a `debug_assert` at runtime (ADR-044 ยง4).
74    async fn search_with_filter(
75        &self,
76        request: &VectorSearchRequest,
77        filter: &VectorMetadataFilter,
78    ) -> StorageResult<Vec<VectorSearchHit>> {
79        if filter.is_empty() {
80            return self.search(request.clone()).await;
81        }
82        debug_assert!(
83            !self.capabilities().supports_filter,
84            "backend claims supports_filter=true but did not override search_with_filter"
85        );
86        Err(StorageError::Unsupported {
87            capability: StorageCapability::Vectors,
88            operation: "search_with_filter".into(),
89            message: "filter pushdown not supported; set supports_filter=true only when overriding this method".into(),
90        })
91    }
92
93    /// Search with N query vectors in one round-trip (HyDE fan-out, multi-query).
94    ///
95    /// Default: sequential calls to [`search`], isolating per-query errors so one
96    /// bad request does not abort the batch. Backends that support native batch
97    /// search should override this and set `supports_batch_search = true`.
98    async fn search_batch(
99        &self,
100        requests: &[VectorSearchRequest],
101    ) -> StorageResult<Vec<StorageResult<Vec<VectorSearchHit>>>> {
102        let mut out = Vec::with_capacity(requests.len());
103        for req in requests {
104            out.push(self.search(req.clone()).await);
105        }
106        Ok(out)
107    }
108
109    /// Re-embed an existing entry in place.
110    ///
111    /// Default: delete then insert. Backends that support atomic in-place update
112    /// should override this and set `supports_update = true` in their
113    /// [`VectorStoreCapabilities`].
114    async fn update(
115        &self,
116        subject_id: Uuid,
117        kind: SubstrateKind,
118        namespace: &str,
119        field: &str,
120        vectors: Vec<Vec<f32>>,
121    ) -> StorageResult<()> {
122        self.delete(subject_id).await?;
123        self.insert(subject_id, kind, namespace, field, vectors)
124            .await
125    }
126
127    /// Remove vectors with no live subject (orphan sweep, ADR-044).
128    ///
129    /// Default returns [`StorageError::Unsupported`]. Backends that implement
130    /// deletion must set `supports_orphan_sweep = true` and override this method.
131    async fn orphan_sweep(&self, config: &OrphanSweepConfig) -> StorageResult<OrphanSweepResult> {
132        let _ = config;
133        Err(StorageError::Unsupported {
134            capability: StorageCapability::Vectors,
135            operation: "orphan_sweep".into(),
136            message: "this backend does not support orphan sweep".into(),
137        })
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use std::sync::atomic::{AtomicBool, Ordering};
144
145    use uuid::Uuid;
146
147    use khive_types::SubstrateKind;
148
149    use super::*;
150    use crate::error::StorageError;
151    use crate::types::{
152        BatchWriteSummary, IndexRebuildScope, OrphanSweepConfig, VectorIndexKind,
153        VectorMetadataFilter, VectorSearchHit, VectorSearchRequest, VectorStoreInfo,
154    };
155
156    // -- Minimal test fake --
157
158    struct TestVectorStore {
159        /// When `true`, `delete` returns an error.
160        fail_delete: AtomicBool,
161        /// When `true`, `insert` returns an error.
162        fail_insert: AtomicBool,
163        /// Tracks whether `delete` was called (set by the last `delete` call).
164        delete_called: AtomicBool,
165        /// Tracks whether `insert` was called (set by the last `insert` call).
166        insert_called: AtomicBool,
167    }
168
169    impl TestVectorStore {
170        fn new() -> Self {
171            Self {
172                fail_delete: AtomicBool::new(false),
173                fail_insert: AtomicBool::new(false),
174                delete_called: AtomicBool::new(false),
175                insert_called: AtomicBool::new(false),
176            }
177        }
178
179        fn with_fail_delete() -> Self {
180            let s = Self::new();
181            s.fail_delete.store(true, Ordering::SeqCst);
182            s
183        }
184
185        fn with_fail_insert() -> Self {
186            let s = Self::new();
187            s.fail_insert.store(true, Ordering::SeqCst);
188            s
189        }
190    }
191
192    #[async_trait]
193    impl VectorStore for TestVectorStore {
194        async fn insert(
195            &self,
196            _subject_id: Uuid,
197            _kind: SubstrateKind,
198            _namespace: &str,
199            _field: &str,
200            _vectors: Vec<Vec<f32>>,
201        ) -> StorageResult<()> {
202            self.insert_called.store(true, Ordering::SeqCst);
203            if self.fail_insert.load(Ordering::SeqCst) {
204                return Err(StorageError::InvalidInput {
205                    capability: StorageCapability::Vectors,
206                    operation: "insert".into(),
207                    message: "injected insert failure".into(),
208                });
209            }
210            Ok(())
211        }
212
213        async fn insert_batch(
214            &self,
215            records: Vec<VectorRecord>,
216        ) -> StorageResult<BatchWriteSummary> {
217            Ok(BatchWriteSummary {
218                attempted: records.len() as u64,
219                affected: records.len() as u64,
220                failed: 0,
221                first_error: String::new(),
222            })
223        }
224
225        async fn delete(&self, _subject_id: Uuid) -> StorageResult<bool> {
226            self.delete_called.store(true, Ordering::SeqCst);
227            if self.fail_delete.load(Ordering::SeqCst) {
228                return Err(StorageError::InvalidInput {
229                    capability: StorageCapability::Vectors,
230                    operation: "delete".into(),
231                    message: "injected delete failure".into(),
232                });
233            }
234            Ok(true)
235        }
236
237        async fn count(&self) -> StorageResult<u64> {
238            Ok(0)
239        }
240
241        async fn search(
242            &self,
243            _request: VectorSearchRequest,
244        ) -> StorageResult<Vec<VectorSearchHit>> {
245            Ok(vec![VectorSearchHit {
246                subject_id: Uuid::nil(),
247                score: khive_score::DeterministicScore::from_f64(0.9),
248                rank: 1,
249            }])
250        }
251
252        async fn info(&self) -> StorageResult<VectorStoreInfo> {
253            Ok(VectorStoreInfo {
254                model_name: "test".into(),
255                dimensions: 4,
256                index_kind: VectorIndexKind::SqliteVec,
257                entry_count: 0,
258                needs_rebuild: false,
259                last_rebuild_at: None,
260            })
261        }
262
263        async fn rebuild(&self, _scope: IndexRebuildScope) -> StorageResult<VectorStoreInfo> {
264            self.info().await
265        }
266    }
267
268    // -- Test cases --
269
270    #[tokio::test]
271    async fn capabilities_returns_baseline_defaults() {
272        let store = TestVectorStore::new();
273        let caps = store.capabilities();
274        assert!(!caps.supports_filter);
275        assert!(!caps.supports_batch_search);
276        assert!(!caps.supports_quantization);
277        assert!(!caps.supports_update);
278        assert!(!caps.supports_orphan_sweep);
279        // Baseline reports the sqlite-vec hard limit (SQLITE_VEC_VEC0_MAX_DIMENSIONS = 8192).
280        assert_eq!(caps.max_dimensions, Some(8192));
281        assert_eq!(caps.index_kinds, vec![VectorIndexKind::SqliteVec]);
282    }
283
284    /// Regression: baseline max_dimensions must be 8192 (SQLITE_VEC_VEC0_MAX_DIMENSIONS),
285    /// not 4096 (SQLITE_VEC_VEC0_K_MAX). Callers with 5000-dim embeddings must not be
286    /// falsely told the default backend is incapable.
287    #[tokio::test]
288    async fn baseline_max_dimensions_is_sqlite_vec_hard_limit() {
289        let store = TestVectorStore::new();
290        let caps = store.capabilities();
291        let max = caps
292            .max_dimensions
293            .expect("baseline must declare a finite dimension limit");
294        assert!(
295            max >= 8192,
296            "baseline max_dimensions ({max}) must be at least 8192 โ€” SQLITE_VEC_VEC0_MAX_DIMENSIONS"
297        );
298    }
299
300    // -- Test cases --
301
302    #[tokio::test]
303    async fn search_with_filter_empty_filter_delegates_to_search() {
304        let store = TestVectorStore::new();
305        let req = VectorSearchRequest {
306            query_vectors: vec![vec![0.1, 0.2, 0.3, 0.4]],
307            top_k: 5,
308            namespace: None,
309            kind: None,
310            filter: None,
311            backend_hints: None,
312        };
313        let filter = VectorMetadataFilter::default(); // all fields empty
314        let result = store.search_with_filter(&req, &filter).await;
315        assert!(result.is_ok());
316        let hits = result.unwrap();
317        // search() on TestVectorStore returns exactly one hit
318        assert_eq!(hits.len(), 1);
319    }
320
321    #[tokio::test]
322    async fn search_with_filter_non_empty_filter_returns_unsupported() {
323        let store = TestVectorStore::new();
324        let req = VectorSearchRequest {
325            query_vectors: vec![vec![0.1, 0.2, 0.3, 0.4]],
326            top_k: 5,
327            namespace: None,
328            kind: None,
329            filter: None,
330            backend_hints: None,
331        };
332        let filter = VectorMetadataFilter {
333            namespaces: vec!["ns:agent".into()],
334            kinds: vec![],
335            property_filters: vec![],
336        };
337        let result = store.search_with_filter(&req, &filter).await;
338        assert!(result.is_err());
339        let err = result.unwrap_err();
340        assert!(
341            matches!(err, StorageError::Unsupported { .. }),
342            "expected Unsupported, got {err:?}"
343        );
344    }
345
346    #[tokio::test]
347    async fn search_batch_returns_one_result_per_request() {
348        let store = TestVectorStore::new();
349        let requests = vec![
350            VectorSearchRequest {
351                query_vectors: vec![vec![0.1, 0.2, 0.3, 0.4]],
352                top_k: 3,
353                namespace: None,
354                kind: None,
355                filter: None,
356                backend_hints: None,
357            },
358            VectorSearchRequest {
359                query_vectors: vec![vec![0.5, 0.6, 0.7, 0.8]],
360                top_k: 3,
361                namespace: None,
362                kind: None,
363                filter: None,
364                backend_hints: None,
365            },
366        ];
367        let result = store.search_batch(&requests).await;
368        assert!(result.is_ok());
369        let batched = result.unwrap();
370        assert_eq!(batched.len(), 2, "should return one result set per request");
371        for inner in &batched {
372            assert!(inner.is_ok(), "each inner result should be Ok");
373            assert_eq!(
374                inner.as_ref().unwrap().len(),
375                1,
376                "each Ok should have one hit"
377            );
378        }
379    }
380
381    #[tokio::test]
382    async fn search_batch_isolates_per_query_errors() {
383        // A store that always fails search โ€” the outer Ok must still be returned,
384        // and the failed inner result must carry the error.
385        struct FailingSearch;
386
387        #[async_trait]
388        impl VectorStore for FailingSearch {
389            async fn insert(
390                &self,
391                _: Uuid,
392                _: SubstrateKind,
393                _: &str,
394                _: &str,
395                _: Vec<Vec<f32>>,
396            ) -> StorageResult<()> {
397                Ok(())
398            }
399            async fn insert_batch(&self, _: Vec<VectorRecord>) -> StorageResult<BatchWriteSummary> {
400                Ok(BatchWriteSummary::default())
401            }
402            async fn delete(&self, _: Uuid) -> StorageResult<bool> {
403                Ok(false)
404            }
405            async fn count(&self) -> StorageResult<u64> {
406                Ok(0)
407            }
408            async fn search(&self, _: VectorSearchRequest) -> StorageResult<Vec<VectorSearchHit>> {
409                Err(StorageError::InvalidInput {
410                    capability: StorageCapability::Vectors,
411                    operation: "search".into(),
412                    message: "injected search failure".into(),
413                })
414            }
415            async fn info(&self) -> StorageResult<VectorStoreInfo> {
416                Ok(VectorStoreInfo {
417                    model_name: "fail".into(),
418                    dimensions: 4,
419                    index_kind: VectorIndexKind::SqliteVec,
420                    entry_count: 0,
421                    needs_rebuild: false,
422                    last_rebuild_at: None,
423                })
424            }
425            async fn rebuild(&self, _: IndexRebuildScope) -> StorageResult<VectorStoreInfo> {
426                self.info().await
427            }
428        }
429
430        let store = FailingSearch;
431        let requests = vec![VectorSearchRequest {
432            query_vectors: vec![vec![0.1]],
433            top_k: 1,
434            namespace: None,
435            kind: None,
436            filter: None,
437            backend_hints: None,
438        }];
439        // Outer result is Ok; the error is in the inner vec.
440        let result = store.search_batch(&requests).await;
441        assert!(result.is_ok(), "outer result must be Ok for batch");
442        let batched = result.unwrap();
443        assert_eq!(batched.len(), 1);
444        assert!(batched[0].is_err(), "inner result must carry the error");
445    }
446
447    #[tokio::test]
448    async fn orphan_sweep_default_returns_unsupported() {
449        let store = TestVectorStore::new();
450        let config = OrphanSweepConfig {
451            subject_id_allowlist: None,
452            namespaces: vec![],
453            substrate_kinds: vec![],
454            max_delete: 100,
455            dry_run: true,
456        };
457        let result = store.orphan_sweep(&config).await;
458        assert!(
459            matches!(result, Err(StorageError::Unsupported { .. })),
460            "expected Unsupported, got {result:?}"
461        );
462        assert!(!store.capabilities().supports_orphan_sweep);
463    }
464
465    #[tokio::test]
466    async fn update_calls_delete_then_insert() {
467        let store = TestVectorStore::new();
468        let id = Uuid::new_v4();
469        let result = store
470            .update(
471                id,
472                SubstrateKind::Entity,
473                "ns:test",
474                "body",
475                vec![vec![0.1, 0.2]],
476            )
477            .await;
478        assert!(result.is_ok());
479        assert!(
480            store.delete_called.load(Ordering::SeqCst),
481            "delete must be called"
482        );
483        assert!(
484            store.insert_called.load(Ordering::SeqCst),
485            "insert must be called after delete"
486        );
487    }
488
489    #[tokio::test]
490    async fn update_propagates_delete_failure() {
491        let store = TestVectorStore::with_fail_delete();
492        let id = Uuid::new_v4();
493        let result = store
494            .update(
495                id,
496                SubstrateKind::Entity,
497                "ns:test",
498                "body",
499                vec![vec![0.1, 0.2]],
500            )
501            .await;
502        assert!(result.is_err());
503        assert!(
504            store.delete_called.load(Ordering::SeqCst),
505            "delete must be attempted"
506        );
507        assert!(
508            !store.insert_called.load(Ordering::SeqCst),
509            "insert must NOT be called when delete fails"
510        );
511    }
512
513    #[tokio::test]
514    async fn update_propagates_insert_failure() {
515        let store = TestVectorStore::with_fail_insert();
516        let id = Uuid::new_v4();
517        let result = store
518            .update(
519                id,
520                SubstrateKind::Entity,
521                "ns:test",
522                "body",
523                vec![vec![0.1, 0.2]],
524            )
525            .await;
526        assert!(result.is_err());
527        assert!(
528            store.insert_called.load(Ordering::SeqCst),
529            "insert must be attempted"
530        );
531    }
532
533    #[tokio::test]
534    async fn vector_metadata_filter_is_empty_with_property_filters() {
535        let empty = VectorMetadataFilter::default();
536        assert!(empty.is_empty());
537
538        let with_ns = VectorMetadataFilter {
539            namespaces: vec!["ns".into()],
540            ..Default::default()
541        };
542        assert!(!with_ns.is_empty());
543
544        use crate::types::{PropertyFilter, PropertyOp};
545        let with_prop = VectorMetadataFilter {
546            property_filters: vec![PropertyFilter {
547                key: "k".into(),
548                op: PropertyOp::Eq,
549                value: serde_json::Value::Bool(true),
550            }],
551            ..Default::default()
552        };
553        assert!(!with_prop.is_empty());
554    }
555}