velesdb_core/collection/search/
vector.rs1use crate::collection::types::Collection;
4use crate::distance::DistanceMetric;
5use crate::error::{Error, Result};
6use crate::index::VectorIndex;
7use crate::point::{Point, SearchResult};
8use crate::quantization::{distance_pq_l2, PQVector, ProductQuantizer, StorageMode};
9use crate::storage::{PayloadStorage, VectorStorage};
10
11impl Collection {
12 fn search_ids_with_adc_if_pq(&self, query: &[f32], k: usize) -> Vec<(u64, f32)> {
13 let config = self.config.read();
14 let is_pq = matches!(config.storage_mode, StorageMode::ProductQuantization);
15 let higher_is_better = config.metric.higher_is_better();
16 let metric = config.metric;
17 #[allow(clippy::cast_possible_truncation)]
19 let oversampling = config.pq_rescore_oversampling.unwrap_or(0) as usize;
20 drop(config);
21
22 if !is_pq {
23 let results = self.index.search(query, k);
24 return self.merge_delta(results, query, k, metric);
25 }
26
27 if oversampling == 0 {
30 let results = self.index.search(query, k);
31 return self.merge_delta(results, query, k, metric);
32 }
33
34 let candidates_k = k.saturating_mul(oversampling).max(k + 32);
35 let index_results = self.index.search(query, candidates_k);
36
37 let pq_cache = self.pq_cache.read();
38 let quantizer = self.pq_quantizer.read();
39 let Some(quantizer) = quantizer.as_ref() else {
40 let results: Vec<(u64, f32)> = index_results.into_iter().take(k).collect();
41 return self.merge_delta(results, query, k, metric);
42 };
43
44 let mut rescored: Vec<(u64, f32)> = index_results
45 .into_iter()
46 .map(|(id, fallback_score)| {
47 let score = pq_cache.get(&id).map_or(fallback_score, |pq_vec| {
48 rescore_with_metric(query, pq_vec, quantizer, metric).unwrap_or_else(|err| {
49 tracing::warn!(
50 id,
51 %err,
52 "PQ reconstruct failed during rescore; \
53 falling back to HNSW score"
54 );
55 fallback_score
56 })
57 });
58 (id, score)
59 })
60 .collect();
61
62 rescored.sort_by(|a, b| {
63 if higher_is_better {
64 b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
65 } else {
66 a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal)
67 }
68 });
69 rescored.truncate(k);
70 self.merge_delta(rescored, query, k, metric)
71 }
72
73 #[cfg(feature = "persistence")]
78 #[inline]
79 pub(crate) fn merge_delta(
80 &self,
81 results: Vec<(u64, f32)>,
82 query: &[f32],
83 k: usize,
84 metric: DistanceMetric,
85 ) -> Vec<(u64, f32)> {
86 crate::collection::streaming::merge_with_delta(
87 results,
88 &self.delta_buffer,
89 query,
90 k,
91 metric,
92 )
93 }
94
95 #[cfg(not(feature = "persistence"))]
96 #[inline]
97 pub(crate) fn merge_delta(
98 &self,
99 results: Vec<(u64, f32)>,
100 _query: &[f32],
101 _k: usize,
102 _metric: DistanceMetric,
103 ) -> Vec<(u64, f32)> {
104 results
105 }
106}
107
108fn rescore_with_metric(
109 query: &[f32],
110 pq_vec: &PQVector,
111 quantizer: &ProductQuantizer,
112 metric: DistanceMetric,
113) -> Result<f32> {
114 if metric == DistanceMetric::Euclidean {
115 Ok(distance_pq_l2(query, pq_vec, &quantizer.codebook))
116 } else {
117 let reconstructed = quantizer.reconstruct(pq_vec)?;
118 Ok(metric.calculate(query, &reconstructed))
119 }
120}
121
122impl Collection {
123 pub fn search(&self, query: &[f32], k: usize) -> Result<Vec<SearchResult>> {
132 let config = self.config.read();
133
134 if config.metadata_only {
136 return Err(Error::SearchNotSupported(config.name.clone()));
137 }
138
139 if query.len() != config.dimension {
140 return Err(Error::DimensionMismatch {
141 expected: config.dimension,
142 actual: query.len(),
143 });
144 }
145 drop(config);
146
147 let index_results = self.search_ids_with_adc_if_pq(query, k);
149
150 let vector_storage = self.vector_storage.read();
151 let payload_storage = self.payload_storage.read();
152
153 let results: Vec<SearchResult> = index_results
155 .into_iter()
156 .filter_map(|(id, score)| {
157 let vector = vector_storage.retrieve(id).ok().flatten()?;
159 let payload = payload_storage.retrieve(id).ok().flatten();
160
161 let point = Point {
162 id,
163 vector,
164 payload,
165 sparse_vectors: None,
166 };
167
168 Some(SearchResult::new(point, score))
169 })
170 .collect();
171
172 Ok(results)
173 }
174
175 pub fn search_with_ef(
184 &self,
185 query: &[f32],
186 k: usize,
187 ef_search: usize,
188 ) -> Result<Vec<SearchResult>> {
189 let config = self.config.read();
190
191 if query.len() != config.dimension {
192 return Err(Error::DimensionMismatch {
193 expected: config.dimension,
194 actual: query.len(),
195 });
196 }
197 drop(config);
198
199 let quality = match ef_search {
201 0..=64 => crate::SearchQuality::Fast,
202 65..=128 => crate::SearchQuality::Balanced,
203 129..=256 => crate::SearchQuality::Accurate,
204 _ => crate::SearchQuality::Perfect,
205 };
206
207 let metric = self.config.read().metric;
208 let index_results = self.index.search_with_quality(query, k, quality);
209 let index_results = self.merge_delta(index_results, query, k, metric);
210
211 let vector_storage = self.vector_storage.read();
212 let payload_storage = self.payload_storage.read();
213
214 let results: Vec<SearchResult> = index_results
215 .into_iter()
216 .filter_map(|(id, score)| {
217 let vector = vector_storage.retrieve(id).ok().flatten()?;
218 let payload = payload_storage.retrieve(id).ok().flatten();
219
220 let point = Point {
221 id,
222 vector,
223 payload,
224 sparse_vectors: None,
225 };
226
227 Some(SearchResult::new(point, score))
228 })
229 .collect();
230
231 Ok(results)
232 }
233
234 pub fn search_ids(&self, query: &[f32], k: usize) -> Result<Vec<(u64, f32)>> {
252 let config = self.config.read();
253
254 if query.len() != config.dimension {
255 return Err(Error::DimensionMismatch {
256 expected: config.dimension,
257 actual: query.len(),
258 });
259 }
260 drop(config);
261
262 let results = self.search_ids_with_adc_if_pq(query, k);
264 Ok(results)
265 }
266
267 pub fn search_with_filter(
282 &self,
283 query: &[f32],
284 k: usize,
285 filter: &crate::filter::Filter,
286 ) -> Result<Vec<SearchResult>> {
287 let config = self.config.read();
288
289 if query.len() != config.dimension {
290 return Err(Error::DimensionMismatch {
291 expected: config.dimension,
292 actual: query.len(),
293 });
294 }
295 drop(config);
296
297 let candidates_k = k.saturating_mul(4).max(k + 10);
300 let index_results = self.search_ids_with_adc_if_pq(query, candidates_k);
301
302 let vector_storage = self.vector_storage.read();
303 let payload_storage = self.payload_storage.read();
304
305 let mut results: Vec<SearchResult> = index_results
308 .into_iter()
309 .filter_map(|(id, score)| {
310 let vector = vector_storage.retrieve(id).ok().flatten()?;
311 let payload = payload_storage.retrieve(id).ok().flatten();
312
313 let matches = match payload.as_ref() {
316 Some(p) => filter.matches(p),
317 None => filter.matches(&serde_json::Value::Null),
318 };
319 if !matches {
320 return None;
321 }
322
323 let point = Point {
324 id,
325 vector,
326 payload,
327 sparse_vectors: None,
328 };
329
330 Some(SearchResult::new(point, score))
331 })
332 .collect();
333
334 let config = self.config.read();
338 let higher_is_better = config.metric.higher_is_better();
339 drop(config);
340
341 results.sort_by(|a, b| {
342 if higher_is_better {
343 b.score
345 .partial_cmp(&a.score)
346 .unwrap_or(std::cmp::Ordering::Equal)
347 } else {
348 a.score
350 .partial_cmp(&b.score)
351 .unwrap_or(std::cmp::Ordering::Equal)
352 }
353 });
354 results.truncate(k);
355
356 Ok(results)
357 }
358}