Skip to main content

velesdb_core/collection/search/
vector.rs

1//! Vector similarity search methods for Collection.
2
3use super::resolve;
4use crate::collection::types::Collection;
5use crate::distance::DistanceMetric;
6use crate::error::{Error, Result};
7use crate::index::VectorIndex;
8use crate::point::{Point, SearchResult};
9use crate::quantization::{distance_pq_l2, PQVector, ProductQuantizer, StorageMode};
10use crate::scored_result::ScoredResult;
11use crate::storage::{PayloadStorage, VectorStorage};
12use crate::validation::validate_dimension_match;
13
14impl Collection {
15    fn search_ids_with_adc_if_pq(&self, query: &[f32], k: usize) -> Vec<ScoredResult> {
16        let config = self.config.read();
17        let is_pq = matches!(config.storage_mode, StorageMode::ProductQuantization);
18        let higher_is_better = config.metric.higher_is_better();
19        let metric = config.metric;
20        // u32 → usize: safe on all 32-bit+ targets (u32::MAX fits in usize).
21        #[allow(clippy::cast_possible_truncation)]
22        let oversampling = config.pq_rescore_oversampling.unwrap_or(0) as usize;
23        drop(config);
24
25        if !is_pq || oversampling == 0 {
26            let results = self.index.search(query, k);
27            return self.merge_delta(results, query, k, metric);
28        }
29
30        let candidates_k = k.saturating_mul(oversampling).max(k + 32);
31        let index_results = self.index.search(query, candidates_k);
32        let rescored =
33            self.rescore_pq_candidates(query, k, metric, higher_is_better, index_results);
34        self.merge_delta(rescored, query, k, metric)
35    }
36
37    /// Rescores PQ candidates using the product quantizer cache.
38    fn rescore_pq_candidates(
39        &self,
40        query: &[f32],
41        k: usize,
42        metric: DistanceMetric,
43        higher_is_better: bool,
44        index_results: Vec<ScoredResult>,
45    ) -> Vec<ScoredResult> {
46        let pq_cache = self.pq_cache.read();
47        let quantizer = self.pq_quantizer.read();
48        let Some(quantizer) = quantizer.as_ref() else {
49            return index_results.into_iter().take(k).collect();
50        };
51
52        let mut rescored: Vec<ScoredResult> = index_results
53            .into_iter()
54            .map(|sr| {
55                let score = pq_cache.get(&sr.id).map_or(sr.score, |pq_vec| {
56                    rescore_with_metric(query, pq_vec, quantizer, metric).unwrap_or_else(|err| {
57                        tracing::warn!(sr.id, %err, "PQ rescore failed; using HNSW score");
58                        sr.score
59                    })
60                });
61                ScoredResult::new(sr.id, score)
62            })
63            .collect();
64
65        resolve::sort_scored_by_metric(&mut rescored, higher_is_better);
66        rescored.truncate(k);
67        rescored
68    }
69
70    /// Merges HNSW results with delta buffer and deferred indexer (if active).
71    ///
72    /// When both the delta buffer and deferred indexer are inactive, this is
73    /// a no-op that returns results unchanged.
74    #[cfg(feature = "persistence")]
75    #[inline]
76    pub(crate) fn merge_delta(
77        &self,
78        results: Vec<ScoredResult>,
79        query: &[f32],
80        k: usize,
81        metric: DistanceMetric,
82    ) -> Vec<ScoredResult> {
83        let after_delta = crate::collection::streaming::merge_with_delta_scored(
84            results,
85            &self.delta_buffer,
86            query,
87            k,
88            metric,
89        );
90        self.merge_deferred_search(after_delta, query, k, metric)
91    }
92
93    /// Merges search results with the deferred indexer buffer.
94    ///
95    /// No-op when deferred indexing is not configured or has no searchable data.
96    #[cfg(feature = "persistence")]
97    fn merge_deferred_search(
98        &self,
99        results: Vec<ScoredResult>,
100        query: &[f32],
101        k: usize,
102        metric: DistanceMetric,
103    ) -> Vec<ScoredResult> {
104        let Some(ref di) = self.deferred_indexer else {
105            return results;
106        };
107        if !di.is_searchable() {
108            return results;
109        }
110        let hnsw_tuples: Vec<(u64, f32)> = results.into_iter().map(Into::into).collect();
111        let merged = di.merge_with_hnsw(hnsw_tuples, query, k, metric);
112        merged.into_iter().map(ScoredResult::from).collect()
113    }
114
115    #[cfg(not(feature = "persistence"))]
116    #[inline]
117    pub(crate) fn merge_delta(
118        &self,
119        results: Vec<ScoredResult>,
120        _query: &[f32],
121        _k: usize,
122        _metric: DistanceMetric,
123    ) -> Vec<ScoredResult> {
124        results
125    }
126}
127
128fn rescore_with_metric(
129    query: &[f32],
130    pq_vec: &PQVector,
131    quantizer: &ProductQuantizer,
132    metric: DistanceMetric,
133) -> Result<f32> {
134    if metric == DistanceMetric::Euclidean {
135        Ok(distance_pq_l2(query, pq_vec, quantizer))
136    } else {
137        // reconstruct() returns a vector in OPQ-rotated space when a rotation matrix is
138        // present. Apply the same rotation to the query so both operands are in the same
139        // space before computing the metric. apply_rotation is a no-op (Cow::Borrowed) when
140        // rotation is None, so this adds no overhead for standard PQ.
141        let rotated_query = quantizer.apply_rotation(query);
142        let reconstructed = quantizer.reconstruct(pq_vec)?;
143        Ok(metric.calculate(&rotated_query, &reconstructed))
144    }
145}
146
147impl Collection {
148    /// Searches for the k nearest neighbors of the query vector.
149    ///
150    /// Uses HNSW index for fast approximate nearest neighbor search.
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if the query vector dimension doesn't match the collection,
155    /// or if this is a metadata-only collection (use `query()` instead).
156    pub fn search(&self, query: &[f32], k: usize) -> Result<Vec<SearchResult>> {
157        let config = self.config.read();
158
159        // Metadata-only collections don't support vector search
160        if config.metadata_only {
161            return Err(Error::SearchNotSupported(config.name.clone()));
162        }
163
164        validate_dimension_match(config.dimension, query.len())?;
165        drop(config);
166
167        // Use HNSW index for fast ANN search
168        let index_results = self.search_ids_with_adc_if_pq(query, k);
169
170        let vector_storage = self.vector_storage.read();
171        let payload_storage = self.payload_storage.read();
172
173        Ok(resolve::resolve_scored_results(
174            &index_results,
175            &*vector_storage,
176            &*payload_storage,
177        ))
178    }
179
180    /// Performs vector similarity search with custom `ef_search` parameter.
181    ///
182    /// Higher `ef_search` = better recall, slower search.
183    /// Default `ef_search` is 128 (Balanced mode).
184    ///
185    /// # Errors
186    ///
187    /// Returns an error if the query vector dimension doesn't match the collection.
188    pub fn search_with_ef(
189        &self,
190        query: &[f32],
191        k: usize,
192        ef_search: usize,
193    ) -> Result<Vec<SearchResult>> {
194        let config = self.config.read();
195
196        validate_dimension_match(config.dimension, query.len())?;
197        drop(config);
198
199        // Convert ef_search to SearchQuality
200        let quality = match ef_search {
201            0..=64 => crate::SearchQuality::Fast,
202            65..=128 => crate::SearchQuality::Balanced,
203            129..=512 => crate::SearchQuality::Accurate,
204            _ => crate::SearchQuality::Perfect,
205        };
206
207        let metric = self.config.read().metric;
208        let index_results = self.index.search_with_quality(query, k, quality);
209        let index_results = self.merge_delta(index_results, query, k, metric);
210
211        let vector_storage = self.vector_storage.read();
212        let payload_storage = self.payload_storage.read();
213
214        Ok(resolve::resolve_scored_results(
215            &index_results,
216            &*vector_storage,
217            &*payload_storage,
218        ))
219    }
220
221    /// Performs fast vector similarity search returning only IDs and scores.
222    ///
223    /// Perf: This is ~3-5x faster than `search()` because it skips vector/payload retrieval.
224    /// Use this when you only need IDs and scores, not full point data.
225    ///
226    /// # Arguments
227    ///
228    /// * `query` - Query vector
229    /// * `k` - Maximum number of results to return
230    ///
231    /// # Returns
232    ///
233    /// Vector of [`ScoredResult`] sorted by similarity.
234    ///
235    /// # Errors
236    ///
237    /// Returns an error if the query vector dimension doesn't match the collection.
238    pub fn search_ids(&self, query: &[f32], k: usize) -> Result<Vec<ScoredResult>> {
239        let config = self.config.read();
240
241        validate_dimension_match(config.dimension, query.len())?;
242        drop(config);
243
244        // Perf: Direct HNSW search without vector/payload retrieval
245        let results = self.search_ids_with_adc_if_pq(query, k);
246        Ok(results)
247    }
248
249    /// Searches for the k nearest neighbors with metadata filtering.
250    ///
251    /// Performs post-filtering: retrieves more candidates from HNSW,
252    /// then filters by metadata conditions.
253    ///
254    /// # Arguments
255    ///
256    /// * `query` - Query vector
257    /// * `k` - Maximum number of results to return
258    /// * `filter` - Metadata filter to apply
259    ///
260    /// # Errors
261    ///
262    /// Returns an error if the query vector dimension doesn't match the collection.
263    pub fn search_with_filter(
264        &self,
265        query: &[f32],
266        k: usize,
267        filter: &crate::filter::Filter,
268    ) -> Result<Vec<SearchResult>> {
269        let config = self.config.read();
270        validate_dimension_match(config.dimension, query.len())?;
271        let higher_is_better = config.metric.higher_is_better();
272        drop(config);
273
274        let selectivity = estimate_filter_selectivity(filter);
275        // Reason: k is a small search count (typically <1000); f64 has 52-bit mantissa.
276        #[allow(clippy::cast_precision_loss)]
277        let k_f64 = k as f64;
278        #[allow(clippy::cast_precision_loss)]
279        let lower = (k + 10) as f64;
280        // Reason: result is clamped to [k+10, 10_000] so no truncation risk.
281        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
282        let candidates_k = (k_f64 / selectivity).ceil().clamp(lower, 10_000.0) as usize;
283        let index_results = self.search_ids_with_adc_if_pq(query, candidates_k);
284
285        let vector_storage = self.vector_storage.read();
286        let payload_storage = self.payload_storage.read();
287
288        let mut results: Vec<SearchResult> = index_results
289            .into_iter()
290            .filter_map(|sr| {
291                // Filter-then-hydrate: test payload filter BEFORE retrieving the vector
292                // to avoid expensive vector reads for non-matching candidates.
293                let payload = payload_storage.retrieve(sr.id).ok().flatten();
294                let matches = match payload.as_ref() {
295                    Some(p) => filter.matches(p),
296                    None => filter.matches(&serde_json::Value::Null),
297                };
298                if !matches {
299                    return None;
300                }
301                let vector = vector_storage.retrieve(sr.id).ok().flatten()?;
302                Some(SearchResult::new(
303                    Point {
304                        id: sr.id,
305                        vector,
306                        payload,
307                        sparse_vectors: None,
308                    },
309                    sr.score,
310                ))
311            })
312            .collect();
313
314        resolve::sort_results_by_metric(&mut results, higher_is_better);
315        results.truncate(k);
316        Ok(results)
317    }
318}
319
320/// Heuristic selectivity estimate based on filter structure.
321///
322/// Returns a value in `(0, 1]` where 1.0 = no filtering, 0.01 = very selective.
323/// Used to compute dynamic over-fetch factor for `search_with_filter`.
324fn estimate_filter_selectivity(filter: &crate::filter::Filter) -> f64 {
325    estimate_condition_selectivity(&filter.condition)
326}
327
328fn estimate_condition_selectivity(cond: &crate::filter::Condition) -> f64 {
329    use crate::filter::Condition;
330    match cond {
331        Condition::Eq { .. } | Condition::IsNull { .. } => 0.1,
332        Condition::Gt { .. }
333        | Condition::Gte { .. }
334        | Condition::Lt { .. }
335        | Condition::Lte { .. }
336        | Condition::Contains { .. }
337        | Condition::Like { .. }
338        | Condition::ILike { .. } => 0.3,
339        Condition::In { values, .. } => {
340            // Reason: values.len() is a small count; f64 precision is sufficient.
341            #[allow(clippy::cast_precision_loss)]
342            let sel = values.len() as f64 * 0.05;
343            sel.min(0.8)
344        }
345        Condition::Neq { .. } | Condition::IsNotNull { .. } => 0.9,
346        Condition::And { conditions } => conditions
347            .iter()
348            .map(estimate_condition_selectivity)
349            .product::<f64>()
350            .max(0.01),
351        Condition::Or { conditions } => conditions
352            .iter()
353            .map(estimate_condition_selectivity)
354            .sum::<f64>()
355            .min(1.0),
356        Condition::Not { condition } => (1.0 - estimate_condition_selectivity(condition)).max(0.01),
357    }
358}