Skip to main content

velesdb_core/collection/search/
batch.rs

1//! Batch and multi-query search methods for Collection.
2
3use super::resolve;
4use crate::collection::types::Collection;
5use crate::error::{Error, Result};
6use crate::index::SearchQuality;
7use crate::point::{Point, SearchResult};
8use crate::storage::{PayloadStorage, VectorStorage};
9use crate::validation::validate_dimension_match;
10
11impl Collection {
12    /// Performs batch search for multiple query vectors in parallel with metadata filtering.
13    /// Supports a different filter for each query in the batch.
14    ///
15    /// # Arguments
16    ///
17    /// * `queries` - List of query vector slices
18    /// * `k` - Maximum number of results per query
19    /// * `filters` - List of optional filters (must match queries length)
20    ///
21    /// # Returns
22    ///
23    /// Vector of search results for each query, matching its respective filter.
24    ///
25    /// # Errors
26    ///
27    /// Returns an error if queries and filters have different lengths or dimension mismatch.
28    pub fn search_batch_with_filters(
29        &self,
30        queries: &[&[f32]],
31        k: usize,
32        filters: &[Option<crate::filter::Filter>],
33    ) -> Result<Vec<Vec<SearchResult>>> {
34        if queries.len() != filters.len() {
35            return Err(Error::Config(format!(
36                "Queries count ({}) does not match filters count ({})",
37                queries.len(),
38                filters.len()
39            )));
40        }
41
42        let dimension = self.config.read().dimension;
43        for query in queries {
44            validate_dimension_match(dimension, query.len())?;
45        }
46
47        let candidates_k = k.saturating_mul(4).max(k + 10);
48        let metric = self.config.read().metric;
49        let higher_is_better = metric.higher_is_better();
50        let index_results =
51            self.index
52                .search_batch_parallel(queries, candidates_k, SearchQuality::Balanced);
53
54        let vector_storage = self.vector_storage.read();
55        let payload_storage = self.payload_storage.read();
56
57        let mut all_results = Vec::with_capacity(queries.len());
58
59        for ((query_results, filter_opt), query) in
60            index_results.into_iter().zip(filters).zip(queries)
61        {
62            let query_results = self.merge_delta(query_results, query, candidates_k, metric);
63            let mut filtered = Self::filter_and_resolve_batch(
64                &query_results,
65                filter_opt.as_ref(),
66                &*vector_storage,
67                &*payload_storage,
68            );
69            resolve::sort_results_by_metric(&mut filtered, higher_is_better);
70            filtered.truncate(k);
71            all_results.push(filtered);
72        }
73
74        Ok(all_results)
75    }
76
77    /// Filters and resolves a single batch query's results.
78    fn filter_and_resolve_batch(
79        results: &[crate::scored_result::ScoredResult],
80        filter: Option<&crate::filter::Filter>,
81        vector_storage: &dyn VectorStorage,
82        payload_storage: &dyn PayloadStorage,
83    ) -> Vec<SearchResult> {
84        results
85            .iter()
86            .filter_map(|sr| {
87                let payload = payload_storage.retrieve(sr.id).ok().flatten();
88                if let Some(f) = filter {
89                    let matches = match payload.as_ref() {
90                        Some(p) => f.matches(p),
91                        None => f.matches(&serde_json::Value::Null),
92                    };
93                    if !matches {
94                        return None;
95                    }
96                }
97                let vector = vector_storage.retrieve(sr.id).ok().flatten()?;
98                Some(SearchResult {
99                    point: Point {
100                        id: sr.id,
101                        vector,
102                        payload,
103                        sparse_vectors: None,
104                    },
105                    score: sr.score,
106                })
107            })
108            .collect()
109    }
110
111    /// Performs batch search for multiple query vectors in parallel with a single metadata filter.
112    ///
113    /// # Arguments
114    ///
115    /// * `queries` - List of query vector slices
116    /// * `k` - Maximum number of results per query
117    /// * `filter` - Metadata filter to apply to all results
118    ///
119    /// # Errors
120    ///
121    /// Returns an error if any query has incorrect dimension.
122    pub fn search_batch_with_filter(
123        &self,
124        queries: &[&[f32]],
125        k: usize,
126        filter: &crate::filter::Filter,
127    ) -> Result<Vec<Vec<SearchResult>>> {
128        let filters: Vec<Option<crate::filter::Filter>> = vec![Some(filter.clone()); queries.len()];
129        self.search_batch_with_filters(queries, k, &filters)
130    }
131
132    /// Performs batch search for multiple query vectors in parallel.
133    ///
134    /// This method is optimized for high throughput using parallel index traversal.
135    ///
136    /// # Arguments
137    ///
138    /// * `queries` - List of query vector slices
139    /// * `k` - Maximum number of results per query
140    ///
141    /// # Returns
142    ///
143    /// Vector of search results for each query, with full point data.
144    ///
145    /// # Errors
146    ///
147    /// Returns an error if any query vector dimension doesn't match the collection.
148    pub fn search_batch_parallel(
149        &self,
150        queries: &[&[f32]],
151        k: usize,
152    ) -> Result<Vec<Vec<SearchResult>>> {
153        let config = self.config.read();
154        let dimension = config.dimension;
155        drop(config);
156
157        // Validate all query dimensions first
158        for query in queries {
159            validate_dimension_match(dimension, query.len())?;
160        }
161
162        // Perf: Use parallel HNSW search (P0 optimization)
163        let metric = self.config.read().metric;
164        let index_results = self
165            .index
166            .search_batch_parallel(queries, k, SearchQuality::Balanced);
167
168        // Map results to SearchResult with full point data
169        let vector_storage = self.vector_storage.read();
170        let payload_storage = self.payload_storage.read();
171
172        let results: Vec<Vec<SearchResult>> = index_results
173            .into_iter()
174            .zip(queries)
175            .map(|(query_results, query)| {
176                // Merge with delta buffer per query
177                let query_results = self.merge_delta(query_results, query, k, metric);
178                resolve::resolve_scored_results(&query_results, &*vector_storage, &*payload_storage)
179            })
180            .collect();
181
182        Ok(results)
183    }
184
185    /// Performs multi-query search with result fusion.
186    ///
187    /// This method executes parallel searches for multiple query vectors and fuses
188    /// the results using the specified fusion strategy. Ideal for Multiple Query
189    /// Generation (MQG) pipelines where multiple reformulations of a user query
190    /// are searched simultaneously.
191    ///
192    /// # Arguments
193    ///
194    /// * `vectors` - Slice of query vectors (all must have same dimension)
195    /// * `top_k` - Maximum number of results to return after fusion
196    /// * `fusion` - Strategy for combining results (Average, Maximum, RRF, Weighted)
197    /// * `filter` - Optional metadata filter to apply to all queries
198    ///
199    /// # Returns
200    ///
201    /// Vector of `SearchResult` sorted by fused score descending.
202    ///
203    /// # Errors
204    ///
205    /// Returns an error if:
206    /// - `vectors` is empty
207    /// - Any vector has incorrect dimension
208    /// - More than 10 vectors are provided (configurable limit)
209    #[allow(clippy::needless_pass_by_value)]
210    pub fn multi_query_search(
211        &self,
212        vectors: &[&[f32]],
213        top_k: usize,
214        fusion: crate::fusion::FusionStrategy,
215        filter: Option<&crate::filter::Filter>,
216    ) -> Result<Vec<SearchResult>> {
217        let metric = self.validate_multi_query_inputs(vectors)?;
218        let overfetch_k = Self::overfetch_factor(top_k);
219
220        let batch_results = self.search_and_merge_delta(vectors, overfetch_k, metric);
221        let filtered = self.apply_pre_fusion_filter(batch_results, filter);
222
223        let fused = fusion
224            .fuse(filtered)
225            .map_err(|e| Error::Config(format!("Fusion error: {e}")))?;
226
227        Ok(self.hydrate_fused_results(&fused, top_k))
228    }
229
230    /// Validates inputs for `multi_query_search` and returns the distance metric.
231    fn validate_multi_query_inputs(&self, vectors: &[&[f32]]) -> Result<crate::DistanceMetric> {
232        const MAX_VECTORS: usize = 10;
233
234        if vectors.is_empty() {
235            return Err(Error::Config(
236                "multi_query_search requires at least one vector".into(),
237            ));
238        }
239        if vectors.len() > MAX_VECTORS {
240            return Err(Error::Config(format!(
241                "multi_query_search supports at most {MAX_VECTORS} vectors, got {}",
242                vectors.len()
243            )));
244        }
245
246        let config = self.config.read();
247        let dimension = config.dimension;
248        let metric = config.metric;
249        drop(config);
250
251        for vector in vectors {
252            validate_dimension_match(dimension, vector.len())?;
253        }
254
255        Ok(metric)
256    }
257
258    /// Calculates the overfetch factor for better fusion quality.
259    fn overfetch_factor(top_k: usize) -> usize {
260        match top_k {
261            0..=10 => top_k * 20,
262            11..=50 => top_k * 10,
263            51..=100 => top_k * 5,
264            _ => top_k * 2,
265        }
266    }
267
268    /// Runs batch index search and merges delta buffer per query.
269    fn search_and_merge_delta(
270        &self,
271        vectors: &[&[f32]],
272        overfetch_k: usize,
273        metric: crate::DistanceMetric,
274    ) -> Vec<Vec<(u64, f32)>> {
275        let batch_results =
276            self.index
277                .search_batch_parallel(vectors, overfetch_k, crate::SearchQuality::Balanced);
278
279        batch_results
280            .into_iter()
281            .zip(vectors)
282            .map(|(query_results, query)| {
283                self.merge_delta(query_results, query, overfetch_k, metric)
284                    .into_iter()
285                    .map(Into::into)
286                    .collect()
287            })
288            .collect()
289    }
290
291    /// Applies metadata filter to batch results before fusion.
292    fn apply_pre_fusion_filter(
293        &self,
294        batch_results: Vec<Vec<(u64, f32)>>,
295        filter: Option<&crate::filter::Filter>,
296    ) -> Vec<Vec<(u64, f32)>> {
297        let Some(f) = filter else {
298            return batch_results;
299        };
300        let payload_storage = self.payload_storage.read();
301        batch_results
302            .into_iter()
303            .map(|query_results| {
304                query_results
305                    .into_iter()
306                    .filter(|(id, _score)| {
307                        if let Ok(Some(payload)) = payload_storage.retrieve(*id) {
308                            f.matches(&payload)
309                        } else {
310                            false
311                        }
312                    })
313                    .collect()
314            })
315            .collect()
316    }
317
318    /// Fetches full point data for the top-k fused results.
319    fn hydrate_fused_results(&self, fused: &[(u64, f32)], top_k: usize) -> Vec<SearchResult> {
320        let vector_storage = self.vector_storage.read();
321        let payload_storage = self.payload_storage.read();
322
323        resolve::resolve_id_score_pairs(fused, top_k, &*vector_storage, &*payload_storage)
324    }
325
326    /// Performs multi-query search returning only IDs and fused scores.
327    ///
328    /// This is a faster variant of `multi_query_search` that skips fetching
329    /// vector and payload data. Use when you only need document IDs.
330    ///
331    /// Reuses [`validate_multi_query_inputs`](Self::validate_multi_query_inputs),
332    /// [`overfetch_factor`](Self::overfetch_factor), and
333    /// [`search_and_merge_delta`](Self::search_and_merge_delta) to eliminate
334    /// duplication with `multi_query_search`.
335    ///
336    /// # Arguments
337    ///
338    /// * `vectors` - Slice of query vectors
339    /// * `top_k` - Maximum number of results
340    /// * `fusion` - Fusion strategy
341    ///
342    /// # Returns
343    ///
344    /// Vector of `(id, fused_score)` tuples sorted by score descending.
345    ///
346    /// # Errors
347    ///
348    /// Returns an error if vectors is empty, exceeds max limit, or has dimension mismatch.
349    #[allow(clippy::needless_pass_by_value)]
350    pub fn multi_query_search_ids(
351        &self,
352        vectors: &[&[f32]],
353        top_k: usize,
354        fusion: crate::fusion::FusionStrategy,
355    ) -> Result<Vec<(u64, f32)>> {
356        let metric = self.validate_multi_query_inputs(vectors)?;
357        let overfetch_k = Self::overfetch_factor(top_k);
358
359        let batch_results = self.search_and_merge_delta(vectors, overfetch_k, metric);
360
361        let fused = fusion
362            .fuse(batch_results)
363            .map_err(|e| Error::Config(format!("Fusion error: {e}")))?;
364
365        Ok(fused.into_iter().take(top_k).collect())
366    }
367}