velesdb_core/collection/search/
batch.rs1use super::resolve;
4use crate::collection::types::Collection;
5use crate::error::{Error, Result};
6use crate::index::SearchQuality;
7use crate::point::{Point, SearchResult};
8use crate::storage::{PayloadStorage, VectorStorage};
9use crate::validation::validate_dimension_match;
10
11impl Collection {
12 pub fn search_batch_with_filters(
29 &self,
30 queries: &[&[f32]],
31 k: usize,
32 filters: &[Option<crate::filter::Filter>],
33 ) -> Result<Vec<Vec<SearchResult>>> {
34 if queries.len() != filters.len() {
35 return Err(Error::Config(format!(
36 "Queries count ({}) does not match filters count ({})",
37 queries.len(),
38 filters.len()
39 )));
40 }
41
42 let dimension = self.config.read().dimension;
43 for query in queries {
44 validate_dimension_match(dimension, query.len())?;
45 }
46
47 let candidates_k = k.saturating_mul(4).max(k + 10);
48 let metric = self.config.read().metric;
49 let higher_is_better = metric.higher_is_better();
50 let index_results =
51 self.index
52 .search_batch_parallel(queries, candidates_k, SearchQuality::Balanced);
53
54 let vector_storage = self.vector_storage.read();
55 let payload_storage = self.payload_storage.read();
56
57 let mut all_results = Vec::with_capacity(queries.len());
58
59 for ((query_results, filter_opt), query) in
60 index_results.into_iter().zip(filters).zip(queries)
61 {
62 let query_results = self.merge_delta(query_results, query, candidates_k, metric);
63 let mut filtered = Self::filter_and_resolve_batch(
64 &query_results,
65 filter_opt.as_ref(),
66 &*vector_storage,
67 &*payload_storage,
68 );
69 resolve::sort_results_by_metric(&mut filtered, higher_is_better);
70 filtered.truncate(k);
71 all_results.push(filtered);
72 }
73
74 Ok(all_results)
75 }
76
77 fn filter_and_resolve_batch(
79 results: &[crate::scored_result::ScoredResult],
80 filter: Option<&crate::filter::Filter>,
81 vector_storage: &dyn VectorStorage,
82 payload_storage: &dyn PayloadStorage,
83 ) -> Vec<SearchResult> {
84 results
85 .iter()
86 .filter_map(|sr| {
87 let payload = payload_storage.retrieve(sr.id).ok().flatten();
88 if let Some(f) = filter {
89 let matches = match payload.as_ref() {
90 Some(p) => f.matches(p),
91 None => f.matches(&serde_json::Value::Null),
92 };
93 if !matches {
94 return None;
95 }
96 }
97 let vector = vector_storage.retrieve(sr.id).ok().flatten()?;
98 Some(SearchResult {
99 point: Point {
100 id: sr.id,
101 vector,
102 payload,
103 sparse_vectors: None,
104 },
105 score: sr.score,
106 })
107 })
108 .collect()
109 }
110
111 pub fn search_batch_with_filter(
123 &self,
124 queries: &[&[f32]],
125 k: usize,
126 filter: &crate::filter::Filter,
127 ) -> Result<Vec<Vec<SearchResult>>> {
128 let filters: Vec<Option<crate::filter::Filter>> = vec![Some(filter.clone()); queries.len()];
129 self.search_batch_with_filters(queries, k, &filters)
130 }
131
132 pub fn search_batch_parallel(
149 &self,
150 queries: &[&[f32]],
151 k: usize,
152 ) -> Result<Vec<Vec<SearchResult>>> {
153 let config = self.config.read();
154 let dimension = config.dimension;
155 drop(config);
156
157 for query in queries {
159 validate_dimension_match(dimension, query.len())?;
160 }
161
162 let metric = self.config.read().metric;
164 let index_results = self
165 .index
166 .search_batch_parallel(queries, k, SearchQuality::Balanced);
167
168 let vector_storage = self.vector_storage.read();
170 let payload_storage = self.payload_storage.read();
171
172 let results: Vec<Vec<SearchResult>> = index_results
173 .into_iter()
174 .zip(queries)
175 .map(|(query_results, query)| {
176 let query_results = self.merge_delta(query_results, query, k, metric);
178 resolve::resolve_scored_results(&query_results, &*vector_storage, &*payload_storage)
179 })
180 .collect();
181
182 Ok(results)
183 }
184
185 #[allow(clippy::needless_pass_by_value)]
210 pub fn multi_query_search(
211 &self,
212 vectors: &[&[f32]],
213 top_k: usize,
214 fusion: crate::fusion::FusionStrategy,
215 filter: Option<&crate::filter::Filter>,
216 ) -> Result<Vec<SearchResult>> {
217 let metric = self.validate_multi_query_inputs(vectors)?;
218 let overfetch_k = Self::overfetch_factor(top_k);
219
220 let batch_results = self.search_and_merge_delta(vectors, overfetch_k, metric);
221 let filtered = self.apply_pre_fusion_filter(batch_results, filter);
222
223 let fused = fusion
224 .fuse(filtered)
225 .map_err(|e| Error::Config(format!("Fusion error: {e}")))?;
226
227 Ok(self.hydrate_fused_results(&fused, top_k))
228 }
229
230 fn validate_multi_query_inputs(&self, vectors: &[&[f32]]) -> Result<crate::DistanceMetric> {
232 const MAX_VECTORS: usize = 10;
233
234 if vectors.is_empty() {
235 return Err(Error::Config(
236 "multi_query_search requires at least one vector".into(),
237 ));
238 }
239 if vectors.len() > MAX_VECTORS {
240 return Err(Error::Config(format!(
241 "multi_query_search supports at most {MAX_VECTORS} vectors, got {}",
242 vectors.len()
243 )));
244 }
245
246 let config = self.config.read();
247 let dimension = config.dimension;
248 let metric = config.metric;
249 drop(config);
250
251 for vector in vectors {
252 validate_dimension_match(dimension, vector.len())?;
253 }
254
255 Ok(metric)
256 }
257
258 fn overfetch_factor(top_k: usize) -> usize {
260 match top_k {
261 0..=10 => top_k * 20,
262 11..=50 => top_k * 10,
263 51..=100 => top_k * 5,
264 _ => top_k * 2,
265 }
266 }
267
268 fn search_and_merge_delta(
270 &self,
271 vectors: &[&[f32]],
272 overfetch_k: usize,
273 metric: crate::DistanceMetric,
274 ) -> Vec<Vec<(u64, f32)>> {
275 let batch_results =
276 self.index
277 .search_batch_parallel(vectors, overfetch_k, crate::SearchQuality::Balanced);
278
279 batch_results
280 .into_iter()
281 .zip(vectors)
282 .map(|(query_results, query)| {
283 self.merge_delta(query_results, query, overfetch_k, metric)
284 .into_iter()
285 .map(Into::into)
286 .collect()
287 })
288 .collect()
289 }
290
291 fn apply_pre_fusion_filter(
293 &self,
294 batch_results: Vec<Vec<(u64, f32)>>,
295 filter: Option<&crate::filter::Filter>,
296 ) -> Vec<Vec<(u64, f32)>> {
297 let Some(f) = filter else {
298 return batch_results;
299 };
300 let payload_storage = self.payload_storage.read();
301 batch_results
302 .into_iter()
303 .map(|query_results| {
304 query_results
305 .into_iter()
306 .filter(|(id, _score)| {
307 if let Ok(Some(payload)) = payload_storage.retrieve(*id) {
308 f.matches(&payload)
309 } else {
310 false
311 }
312 })
313 .collect()
314 })
315 .collect()
316 }
317
318 fn hydrate_fused_results(&self, fused: &[(u64, f32)], top_k: usize) -> Vec<SearchResult> {
320 let vector_storage = self.vector_storage.read();
321 let payload_storage = self.payload_storage.read();
322
323 resolve::resolve_id_score_pairs(fused, top_k, &*vector_storage, &*payload_storage)
324 }
325
326 #[allow(clippy::needless_pass_by_value)]
350 pub fn multi_query_search_ids(
351 &self,
352 vectors: &[&[f32]],
353 top_k: usize,
354 fusion: crate::fusion::FusionStrategy,
355 ) -> Result<Vec<(u64, f32)>> {
356 let metric = self.validate_multi_query_inputs(vectors)?;
357 let overfetch_k = Self::overfetch_factor(top_k);
358
359 let batch_results = self.search_and_merge_delta(vectors, overfetch_k, metric);
360
361 let fused = fusion
362 .fuse(batch_results)
363 .map_err(|e| Error::Config(format!("Fusion error: {e}")))?;
364
365 Ok(fused.into_iter().take(top_k).collect())
366 }
367}