1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
//! Batch and multi-query search methods for Collection.
use super::resolve;
use crate::collection::types::Collection;
use crate::error::{Error, Result};
use crate::index::SearchQuality;
use crate::point::{Point, SearchResult};
use crate::storage::{PayloadStorage, VectorStorage};
use crate::validation::validate_dimension_match;
impl Collection {
/// Performs batch search for multiple query vectors in parallel with metadata filtering.
/// Supports a different filter for each query in the batch.
///
/// # Arguments
///
/// * `queries` - List of query vector slices
/// * `k` - Maximum number of results per query
/// * `filters` - List of optional filters (must match queries length)
///
/// # Returns
///
/// Vector of search results for each query, matching its respective filter.
///
/// # Errors
///
/// Returns an error if queries and filters have different lengths or dimension mismatch.
pub fn search_batch_with_filters(
&self,
queries: &[&[f32]],
k: usize,
filters: &[Option<crate::filter::Filter>],
) -> Result<Vec<Vec<SearchResult>>> {
if queries.len() != filters.len() {
return Err(Error::Config(format!(
"Queries count ({}) does not match filters count ({})",
queries.len(),
filters.len()
)));
}
let dimension = self.config.read().dimension;
for query in queries {
validate_dimension_match(dimension, query.len())?;
}
let candidates_k = k.saturating_mul(4).max(k + 10);
let metric = self.config.read().metric;
let higher_is_better = metric.higher_is_better();
let index_results =
self.index
.search_batch_parallel(queries, candidates_k, SearchQuality::Balanced);
let vector_storage = self.vector_storage.read();
let payload_storage = self.payload_storage.read();
let mut all_results = Vec::with_capacity(queries.len());
for ((query_results, filter_opt), query) in
index_results.into_iter().zip(filters).zip(queries)
{
let query_results = self.merge_delta(query_results, query, candidates_k, metric);
let mut filtered = Self::filter_and_resolve_batch(
&query_results,
filter_opt.as_ref(),
&*vector_storage,
&*payload_storage,
);
resolve::sort_results_by_metric(&mut filtered, higher_is_better);
filtered.truncate(k);
all_results.push(filtered);
}
Ok(all_results)
}
/// Filters and resolves a single batch query's results.
fn filter_and_resolve_batch(
results: &[crate::scored_result::ScoredResult],
filter: Option<&crate::filter::Filter>,
vector_storage: &dyn VectorStorage,
payload_storage: &dyn PayloadStorage,
) -> Vec<SearchResult> {
results
.iter()
.filter_map(|sr| {
let payload = payload_storage.retrieve(sr.id).ok().flatten();
if let Some(f) = filter {
let matches = match payload.as_ref() {
Some(p) => f.matches(p),
None => f.matches(&serde_json::Value::Null),
};
if !matches {
return None;
}
}
let vector = vector_storage.retrieve(sr.id).ok().flatten()?;
Some(SearchResult {
point: Point {
id: sr.id,
vector,
payload,
sparse_vectors: None,
},
score: sr.score,
})
})
.collect()
}
/// Performs batch search for multiple query vectors in parallel with a single metadata filter.
///
/// # Arguments
///
/// * `queries` - List of query vector slices
/// * `k` - Maximum number of results per query
/// * `filter` - Metadata filter to apply to all results
///
/// # Errors
///
/// Returns an error if any query has incorrect dimension.
pub fn search_batch_with_filter(
&self,
queries: &[&[f32]],
k: usize,
filter: &crate::filter::Filter,
) -> Result<Vec<Vec<SearchResult>>> {
let filters: Vec<Option<crate::filter::Filter>> = vec![Some(filter.clone()); queries.len()];
self.search_batch_with_filters(queries, k, &filters)
}
/// Performs batch search for multiple query vectors in parallel.
///
/// This method is optimized for high throughput using parallel index traversal.
///
/// # Arguments
///
/// * `queries` - List of query vector slices
/// * `k` - Maximum number of results per query
///
/// # Returns
///
/// Vector of search results for each query, with full point data.
///
/// # Errors
///
/// Returns an error if any query vector dimension doesn't match the collection.
pub fn search_batch_parallel(
&self,
queries: &[&[f32]],
k: usize,
) -> Result<Vec<Vec<SearchResult>>> {
let config = self.config.read();
let dimension = config.dimension;
drop(config);
// Validate all query dimensions first
for query in queries {
validate_dimension_match(dimension, query.len())?;
}
// Perf: Use parallel HNSW search (P0 optimization)
let metric = self.config.read().metric;
let index_results = self
.index
.search_batch_parallel(queries, k, SearchQuality::Balanced);
// Map results to SearchResult with full point data
let vector_storage = self.vector_storage.read();
let payload_storage = self.payload_storage.read();
let results: Vec<Vec<SearchResult>> = index_results
.into_iter()
.zip(queries)
.map(|(query_results, query)| {
// Merge with delta buffer per query
let query_results = self.merge_delta(query_results, query, k, metric);
resolve::resolve_scored_results(&query_results, &*vector_storage, &*payload_storage)
})
.collect();
Ok(results)
}
/// Performs multi-query search with result fusion.
///
/// This method executes parallel searches for multiple query vectors and fuses
/// the results using the specified fusion strategy. Ideal for Multiple Query
/// Generation (MQG) pipelines where multiple reformulations of a user query
/// are searched simultaneously.
///
/// # Arguments
///
/// * `vectors` - Slice of query vectors (all must have same dimension)
/// * `top_k` - Maximum number of results to return after fusion
/// * `fusion` - Strategy for combining results (Average, Maximum, RRF, Weighted)
/// * `filter` - Optional metadata filter to apply to all queries
///
/// # Returns
///
/// Vector of `SearchResult` sorted by fused score descending.
///
/// # Errors
///
/// Returns an error if:
/// - `vectors` is empty
/// - Any vector has incorrect dimension
/// - More than 10 vectors are provided (configurable limit)
#[allow(clippy::needless_pass_by_value)]
pub fn multi_query_search(
&self,
vectors: &[&[f32]],
top_k: usize,
fusion: crate::fusion::FusionStrategy,
filter: Option<&crate::filter::Filter>,
) -> Result<Vec<SearchResult>> {
let metric = self.validate_multi_query_inputs(vectors)?;
let overfetch_k = Self::overfetch_factor(top_k);
let batch_results = self.search_and_merge_delta(vectors, overfetch_k, metric);
let filtered = self.apply_pre_fusion_filter(batch_results, filter);
let fused = fusion
.fuse(filtered)
.map_err(|e| Error::Config(format!("Fusion error: {e}")))?;
Ok(self.hydrate_fused_results(&fused, top_k))
}
/// Validates inputs for `multi_query_search` and returns the distance metric.
fn validate_multi_query_inputs(&self, vectors: &[&[f32]]) -> Result<crate::DistanceMetric> {
const MAX_VECTORS: usize = 10;
if vectors.is_empty() {
return Err(Error::Config(
"multi_query_search requires at least one vector".into(),
));
}
if vectors.len() > MAX_VECTORS {
return Err(Error::Config(format!(
"multi_query_search supports at most {MAX_VECTORS} vectors, got {}",
vectors.len()
)));
}
let config = self.config.read();
let dimension = config.dimension;
let metric = config.metric;
drop(config);
for vector in vectors {
validate_dimension_match(dimension, vector.len())?;
}
Ok(metric)
}
/// Calculates the overfetch factor for better fusion quality.
fn overfetch_factor(top_k: usize) -> usize {
match top_k {
0..=10 => top_k * 20,
11..=50 => top_k * 10,
51..=100 => top_k * 5,
_ => top_k * 2,
}
}
/// Runs batch index search and merges delta buffer per query.
fn search_and_merge_delta(
&self,
vectors: &[&[f32]],
overfetch_k: usize,
metric: crate::DistanceMetric,
) -> Vec<Vec<(u64, f32)>> {
let batch_results =
self.index
.search_batch_parallel(vectors, overfetch_k, crate::SearchQuality::Balanced);
batch_results
.into_iter()
.zip(vectors)
.map(|(query_results, query)| {
self.merge_delta(query_results, query, overfetch_k, metric)
.into_iter()
.map(Into::into)
.collect()
})
.collect()
}
/// Applies metadata filter to batch results before fusion.
fn apply_pre_fusion_filter(
&self,
batch_results: Vec<Vec<(u64, f32)>>,
filter: Option<&crate::filter::Filter>,
) -> Vec<Vec<(u64, f32)>> {
let Some(f) = filter else {
return batch_results;
};
let payload_storage = self.payload_storage.read();
batch_results
.into_iter()
.map(|query_results| {
query_results
.into_iter()
.filter(|(id, _score)| {
if let Ok(Some(payload)) = payload_storage.retrieve(*id) {
f.matches(&payload)
} else {
false
}
})
.collect()
})
.collect()
}
/// Fetches full point data for the top-k fused results.
fn hydrate_fused_results(&self, fused: &[(u64, f32)], top_k: usize) -> Vec<SearchResult> {
let vector_storage = self.vector_storage.read();
let payload_storage = self.payload_storage.read();
resolve::resolve_id_score_pairs(fused, top_k, &*vector_storage, &*payload_storage)
}
/// Performs multi-query search returning only IDs and fused scores.
///
/// This is a faster variant of `multi_query_search` that skips fetching
/// vector and payload data. Use when you only need document IDs.
///
/// Reuses [`validate_multi_query_inputs`](Self::validate_multi_query_inputs),
/// [`overfetch_factor`](Self::overfetch_factor), and
/// [`search_and_merge_delta`](Self::search_and_merge_delta) to eliminate
/// duplication with `multi_query_search`.
///
/// # Arguments
///
/// * `vectors` - Slice of query vectors
/// * `top_k` - Maximum number of results
/// * `fusion` - Fusion strategy
///
/// # Returns
///
/// Vector of `(id, fused_score)` tuples sorted by score descending.
///
/// # Errors
///
/// Returns an error if vectors is empty, exceeds max limit, or has dimension mismatch.
#[allow(clippy::needless_pass_by_value)]
pub fn multi_query_search_ids(
&self,
vectors: &[&[f32]],
top_k: usize,
fusion: crate::fusion::FusionStrategy,
) -> Result<Vec<(u64, f32)>> {
let metric = self.validate_multi_query_inputs(vectors)?;
let overfetch_k = Self::overfetch_factor(top_k);
let batch_results = self.search_and_merge_delta(vectors, overfetch_k, metric);
let fused = fusion
.fuse(batch_results)
.map_err(|e| Error::Config(format!("Fusion error: {e}")))?;
Ok(fused.into_iter().take(top_k).collect())
}
}