velesdb_core/collection/vector_collection/search.rs
1//! Search, match, aggregation, and query execution for `VectorCollection`.
2
3use std::collections::HashMap;
4
5use crate::error::Result;
6use crate::point::SearchResult;
7
8use super::VectorCollection;
9
10impl VectorCollection {
11 /// Performs kNN vector search using the HNSW index.
12 ///
13 /// Returns the `k` nearest neighbors ordered by ascending distance.
14 ///
15 /// # Errors
16 ///
17 /// - Returns an error if the query dimension does not match the collection.
18 /// - Returns an error if the HNSW index is not initialized.
19 ///
20 /// # Examples
21 ///
22 /// ```rust,no_run
23 /// # use velesdb_core::{VectorCollection, DistanceMetric, StorageMode};
24 /// # let coll = VectorCollection::create("./data/v".into(), "v", 128, DistanceMetric::Cosine, StorageMode::Full)?;
25 /// let results = coll.search(&vec![0.1; 128], 10)?;
26 /// for r in &results {
27 /// println!("id={} score={}", r.point.id, r.score);
28 /// }
29 /// # Ok::<(), velesdb_core::Error>(())
30 /// ```
31 pub fn search(&self, query: &[f32], k: usize) -> Result<Vec<SearchResult>> {
32 self.inner.search(query, k)
33 }
34
35 /// Performs full-text BM25 search over indexed payload fields.
36 ///
37 /// Returns up to `k` results ranked by BM25 relevance score.
38 ///
39 /// # Errors
40 ///
41 /// - Returns an error if storage retrieval fails.
42 ///
43 /// # Examples
44 ///
45 /// ```rust,no_run
46 /// # use velesdb_core::{VectorCollection, DistanceMetric, StorageMode};
47 /// # let coll = VectorCollection::create("./data/v".into(), "v", 128, DistanceMetric::Cosine, StorageMode::Full)?;
48 /// let results = coll.text_search("machine learning", 5)?;
49 /// # Ok::<(), velesdb_core::Error>(())
50 /// ```
51 pub fn text_search(&self, query: &str, k: usize) -> Result<Vec<SearchResult>> {
52 self.inner.text_search(query, k)
53 }
54
55 /// Performs kNN search with an explicit `ef_search` override.
56 ///
57 /// Higher `ef_search` values improve recall at the cost of latency.
58 ///
59 /// # Errors
60 ///
61 /// - Returns an error if the query dimension does not match the collection.
62 pub fn search_with_ef(
63 &self,
64 query: &[f32],
65 k: usize,
66 ef_search: usize,
67 ) -> Result<Vec<SearchResult>> {
68 self.inner.search_with_ef(query, k, ef_search)
69 }
70
71 /// Performs kNN search with a specific [`crate::SearchQuality`] profile.
72 ///
73 /// Use this instead of [`Self::search_with_ef`] when you want named
74 /// quality modes like [`crate::SearchQuality::AutoTune`] that compute ef
75 /// dynamically.
76 ///
77 /// # Errors
78 ///
79 /// - Returns an error if the query dimension does not match the collection.
80 pub fn search_with_quality(
81 &self,
82 query: &[f32],
83 k: usize,
84 quality: crate::SearchQuality,
85 ) -> Result<Vec<SearchResult>> {
86 self.inner.search_with_quality(query, k, quality)
87 }
88
89 /// Performs kNN search with a metadata filter applied post-retrieval.
90 ///
91 /// # Errors
92 ///
93 /// - Returns an error if the query dimension does not match the collection.
94 /// - Returns an error if the filter references an unsupported field type.
95 pub fn search_with_filter(
96 &self,
97 query: &[f32],
98 k: usize,
99 filter: &crate::filter::Filter,
100 ) -> Result<Vec<SearchResult>> {
101 self.inner.search_with_filter(query, k, filter)
102 }
103
104 /// Returns [`crate::ScoredResult`] pairs without payload hydration.
105 ///
106 /// Faster than [`search`](Self::search) when only IDs and scores are needed.
107 ///
108 /// # Errors
109 ///
110 /// - Returns an error if the query dimension does not match the collection.
111 pub fn search_ids(
112 &self,
113 query: &[f32],
114 k: usize,
115 ) -> Result<Vec<crate::scored_result::ScoredResult>> {
116 self.inner.search_ids(query, k)
117 }
118
119 /// Full-text search with metadata filter.
120 ///
121 /// # Errors
122 ///
123 /// Returns an error if storage retrieval fails.
124 pub fn text_search_with_filter(
125 &self,
126 query: &str,
127 k: usize,
128 filter: &crate::filter::Filter,
129 ) -> Result<Vec<SearchResult>> {
130 self.inner.text_search_with_filter(query, k, filter)
131 }
132
133 /// Performs hybrid search combining vector kNN and BM25 full-text via RRF fusion.
134 ///
135 /// When `alpha` is `None`, a default blending factor is used. Values closer
136 /// to `1.0` weight vector results more; values closer to `0.0` weight text.
137 ///
138 /// # Errors
139 ///
140 /// - Returns an error if the query dimension does not match the collection.
141 /// - Returns an error if text indexing or storage retrieval fails.
142 ///
143 /// # Examples
144 ///
145 /// ```rust,no_run
146 /// # use velesdb_core::{VectorCollection, DistanceMetric, StorageMode};
147 /// # let coll = VectorCollection::create("./data/v".into(), "v", 128, DistanceMetric::Cosine, StorageMode::Full)?;
148 /// let results = coll.hybrid_search(&vec![0.1; 128], "machine learning", 10, Some(0.7))?;
149 /// # Ok::<(), velesdb_core::Error>(())
150 /// ```
151 pub fn hybrid_search(
152 &self,
153 vector: &[f32],
154 text: &str,
155 k: usize,
156 alpha: Option<f32>,
157 ) -> Result<Vec<SearchResult>> {
158 self.inner.hybrid_search(vector, text, k, alpha, None)
159 }
160
161 /// Performs hybrid search (vector + BM25) with a metadata filter.
162 ///
163 /// # Errors
164 ///
165 /// - Returns an error if the query dimension does not match the collection.
166 /// - Returns an error if text indexing, storage, or filtering fails.
167 pub fn hybrid_search_with_filter(
168 &self,
169 vector: &[f32],
170 text: &str,
171 k: usize,
172 alpha: Option<f32>,
173 filter: &crate::filter::Filter,
174 ) -> Result<Vec<SearchResult>> {
175 self.inner
176 .hybrid_search_with_filter(vector, text, k, alpha, filter, None)
177 }
178
179 /// Performs batch kNN search with per-query metadata filters.
180 ///
181 /// Each query in `queries` is paired with the filter at the same index in
182 /// `filters`. Pass `None` for queries that should not be filtered.
183 ///
184 /// # Errors
185 ///
186 /// - Returns an error if any query dimension does not match the collection.
187 /// - Returns an error if `queries` and `filters` have different lengths.
188 ///
189 /// # Examples
190 ///
191 /// ```rust,no_run
192 /// # use velesdb_core::{VectorCollection, DistanceMetric, StorageMode};
193 /// # let coll = VectorCollection::create("./data/v".into(), "v", 128, DistanceMetric::Cosine, StorageMode::Full)?;
194 /// let q1 = vec![0.1; 128];
195 /// let q2 = vec![0.2; 128];
196 /// let results = coll.search_batch_with_filters(
197 /// &[q1.as_slice(), q2.as_slice()],
198 /// 10,
199 /// &[None, None],
200 /// )?;
201 /// assert_eq!(results.len(), 2);
202 /// # Ok::<(), velesdb_core::Error>(())
203 /// ```
204 pub fn search_batch_with_filters(
205 &self,
206 queries: &[&[f32]],
207 k: usize,
208 filters: &[Option<crate::filter::Filter>],
209 ) -> Result<Vec<Vec<SearchResult>>> {
210 self.inner.search_batch_with_filters(queries, k, filters)
211 }
212
213 /// Performs batch kNN search without filters, optimized for throughput.
214 ///
215 /// Uses rayon-parallelized HNSW search and result resolution for maximum
216 /// queries-per-second. Prefer this over calling [`search`](Self::search)
217 /// in a loop.
218 ///
219 /// # Errors
220 ///
221 /// - Returns an error if any query dimension does not match the collection.
222 ///
223 /// # Examples
224 ///
225 /// ```rust,no_run
226 /// # use velesdb_core::{VectorCollection, DistanceMetric, StorageMode};
227 /// # let coll = VectorCollection::create("./data/v".into(), "v", 128, DistanceMetric::Cosine, StorageMode::Full)?;
228 /// let q1 = vec![0.1; 128];
229 /// let q2 = vec![0.2; 128];
230 /// let results = coll.search_batch_parallel(&[q1.as_slice(), q2.as_slice()], 10)?;
231 /// assert_eq!(results.len(), 2);
232 /// # Ok::<(), velesdb_core::Error>(())
233 /// ```
234 pub fn search_batch_parallel(
235 &self,
236 queries: &[&[f32]],
237 k: usize,
238 ) -> Result<Vec<Vec<SearchResult>>> {
239 self.inner.search_batch_parallel(queries, k)
240 }
241
242 /// Performs multi-query search fusing results from multiple query vectors.
243 ///
244 /// # Errors
245 ///
246 /// - Returns an error if any query dimension does not match the collection.
247 /// - Returns an error if the fusion strategy fails.
248 pub fn multi_query_search(
249 &self,
250 queries: &[&[f32]],
251 k: usize,
252 strategy: crate::fusion::FusionStrategy,
253 filter: Option<&crate::filter::Filter>,
254 ) -> Result<Vec<SearchResult>> {
255 self.inner.multi_query_search(queries, k, strategy, filter)
256 }
257
258 /// Performs multi-query search returning only IDs and fused scores.
259 ///
260 /// # Errors
261 ///
262 /// - Returns an error if any query dimension does not match the collection.
263 /// - Returns an error if the fusion strategy fails.
264 pub fn multi_query_search_ids(
265 &self,
266 queries: &[&[f32]],
267 k: usize,
268 strategy: crate::fusion::FusionStrategy,
269 ) -> Result<Vec<(u64, f32)>> {
270 self.inner.multi_query_search_ids(queries, k, strategy)
271 }
272
273 /// Performs sparse-only search on the named index.
274 ///
275 /// # Errors
276 ///
277 /// Returns an error if the named sparse index does not exist.
278 pub fn sparse_search(
279 &self,
280 query: &crate::index::sparse::SparseVector,
281 k: usize,
282 index_name: &str,
283 ) -> Result<Vec<SearchResult>> {
284 let indexes = self.inner.sparse_indexes.read();
285 let index = indexes.get(index_name).ok_or_else(|| {
286 crate::error::Error::Config(format!(
287 "Sparse index '{}' not found",
288 if index_name.is_empty() {
289 "<default>"
290 } else {
291 index_name
292 }
293 ))
294 })?;
295 let results = crate::index::sparse::sparse_search(index, query, k);
296 drop(indexes);
297 Ok(self.inner.resolve_sparse_results(&results, k))
298 }
299
300 /// Performs hybrid dense+sparse search with RRF fusion.
301 ///
302 /// # Errors
303 ///
304 /// Returns an error if dense or sparse search fails, or fusion errors.
305 #[allow(clippy::too_many_arguments)]
306 pub fn hybrid_sparse_search(
307 &self,
308 dense_vector: &[f32],
309 sparse_query: &crate::index::sparse::SparseVector,
310 k: usize,
311 index_name: &str,
312 strategy: &crate::fusion::FusionStrategy,
313 ) -> Result<Vec<SearchResult>> {
314 let candidate_k = k.saturating_mul(2).max(k + 10);
315
316 let (dense_results, sparse_results) = self.inner.execute_both_branches(
317 dense_vector,
318 sparse_query,
319 index_name,
320 candidate_k,
321 None,
322 );
323
324 if dense_results.is_empty() && sparse_results.is_empty() {
325 return Ok(Vec::new());
326 }
327 if dense_results.is_empty() {
328 let scored: Vec<(u64, f32)> = sparse_results
329 .iter()
330 .map(|sd| (sd.doc_id, sd.score))
331 .collect();
332 return Ok(self.inner.resolve_fused_results(&scored, k));
333 }
334 if sparse_results.is_empty() {
335 return Ok(self.inner.resolve_fused_results(&dense_results, k));
336 }
337
338 let sparse_tuples: Vec<(u64, f32)> = sparse_results
339 .iter()
340 .map(|sd| (sd.doc_id, sd.score))
341 .collect();
342
343 let fused = strategy
344 .fuse(vec![dense_results, sparse_tuples])
345 .map_err(|e| crate::error::Error::Config(format!("Fusion error: {e}")))?;
346
347 Ok(self.inner.resolve_fused_results(&fused, k))
348 }
349
350 /// Executes a graph MATCH query against the collection's edge store.
351 ///
352 /// # Errors
353 ///
354 /// - Returns an error if the match clause references an invalid label or property.
355 /// - Returns an error if the edge store is not initialized.
356 pub fn execute_match(
357 &self,
358 match_clause: &crate::velesql::MatchClause,
359 params: &std::collections::HashMap<String, serde_json::Value>,
360 ) -> crate::error::Result<Vec<crate::collection::search::query::match_exec::MatchResult>> {
361 self.inner.execute_match(match_clause, params)
362 }
363
364 /// Executes a MATCH query with vector similarity filtering.
365 ///
366 /// # Errors
367 ///
368 /// - Returns an error if the match clause is invalid or the query dimension mismatches.
369 pub fn execute_match_with_similarity(
370 &self,
371 match_clause: &crate::velesql::MatchClause,
372 query_vector: &[f32],
373 threshold: f32,
374 params: &std::collections::HashMap<String, serde_json::Value>,
375 ) -> crate::error::Result<Vec<crate::collection::search::query::match_exec::MatchResult>> {
376 self.inner
377 .execute_match_with_similarity(match_clause, query_vector, threshold, params)
378 }
379
380 /// Executes an aggregation query (GROUP BY / COUNT / SUM / AVG / MIN / MAX).
381 ///
382 /// # Errors
383 ///
384 /// - Returns an error if the query is invalid or aggregation computation fails.
385 pub fn execute_aggregate(
386 &self,
387 query: &crate::velesql::Query,
388 params: &std::collections::HashMap<String, serde_json::Value>,
389 ) -> Result<serde_json::Value> {
390 self.inner.execute_aggregate(query, params)
391 }
392
393 /// Executes a parsed `VelesQL` query.
394 ///
395 /// # Errors
396 ///
397 /// - Returns an error if the query references missing fields or execution fails.
398 pub fn execute_query(
399 &self,
400 query: &crate::velesql::Query,
401 params: &HashMap<String, serde_json::Value>,
402 ) -> Result<Vec<SearchResult>> {
403 self.inner.execute_query(query, params)
404 }
405
406 /// Executes a query with instrumentation and returns plan + actual stats.
407 ///
408 /// Delegates to [`crate::Database::explain_analyze_query`].
409 ///
410 /// # Errors
411 ///
412 /// Returns an error if the query is invalid or execution fails.
413 pub fn explain_analyze_query(
414 &self,
415 query: &crate::velesql::Query,
416 params: &HashMap<String, serde_json::Value>,
417 ) -> Result<crate::velesql::ExplainOutput> {
418 self.inner.explain_analyze_query(query, params)
419 }
420
421 /// Sends a point into the streaming ingestion channel.
422 ///
423 /// Returns `Ok(())` on success (202 semantics). Returns
424 /// `BackpressureError::BufferFull` when the channel is at capacity, or
425 /// `BackpressureError::NotConfigured` if streaming is not active.
426 ///
427 /// # Errors
428 ///
429 /// Returns `BackpressureError` on buffer-full or not-configured.
430 #[cfg(feature = "persistence")]
431 pub fn stream_insert(
432 &self,
433 point: crate::point::Point,
434 ) -> std::result::Result<(), crate::collection::streaming::BackpressureError> {
435 self.inner.stream_insert(point)
436 }
437
438 /// Sends a batch of points into the streaming ingestion channel.
439 ///
440 /// Acquires the ingester lock once for the entire batch, eliminating
441 /// per-point lock overhead. Returns the number of points successfully
442 /// queued. Companion to [`Self::stream_insert`] for single-point sends.
443 ///
444 /// # Errors
445 ///
446 /// Returns `BackpressureError` on buffer-full, drain-dead, or not-configured.
447 #[cfg(feature = "persistence")]
448 pub fn stream_insert_batch(
449 &self,
450 points: Vec<crate::point::Point>,
451 ) -> std::result::Result<usize, crate::collection::streaming::BackpressureError> {
452 self.inner.stream_insert_batch(points)
453 }
454
455 /// Pushes `(id, vector)` entries into the delta buffer if it is active.
456 ///
457 /// No-op when the delta buffer is inactive. This is the public interface
458 /// used by streaming upsert handlers (e.g., NDJSON stream endpoint) to
459 /// keep the delta buffer in sync after a successful `upsert_bulk` call.
460 #[cfg(feature = "persistence")]
461 pub fn push_to_delta_if_active(&self, entries: &[(u64, Vec<f32>)]) {
462 self.inner.push_to_delta_if_active(entries);
463 }
464
465 /// Returns `true` if the delta buffer is currently active (HNSW rebuild
466 /// in progress). External callers can use this to decide whether to
467 /// snapshot entries for delta before a `upsert_bulk` call.
468 #[cfg(feature = "persistence")]
469 #[must_use]
470 pub fn is_delta_active(&self) -> bool {
471 self.inner.delta_buffer.is_active()
472 }
473
474 /// Enables streaming ingestion on this collection.
475 ///
476 /// Creates a [`StreamIngester`](crate::collection::streaming::StreamIngester) with
477 /// the given `config` and stores it internally. Points can then be submitted via
478 /// [`stream_insert`](Self::stream_insert) or [`stream_insert_batch`](Self::stream_insert_batch).
479 ///
480 /// Calling this when streaming is already active replaces the existing
481 /// ingester (the old drain task is aborted via `Drop`).
482 #[cfg(feature = "persistence")]
483 pub fn enable_streaming(&self, config: crate::collection::streaming::StreamingConfig) {
484 self.inner.enable_streaming(config);
485 }
486
487 /// Executes a raw VelesQL string, parsing it before execution.
488 ///
489 /// # Errors
490 ///
491 /// - Returns an error if the SQL string cannot be parsed.
492 /// - Returns an error if query execution fails.
493 pub fn execute_query_str(
494 &self,
495 sql: &str,
496 params: &HashMap<String, serde_json::Value>,
497 ) -> Result<Vec<SearchResult>> {
498 self.inner.execute_query_str(sql, params)
499 }
500
501 /// Reorders HNSW graph nodes in BFS traversal order for improved cache locality.
502 ///
503 /// After bulk insertion, nodes are stored in insertion order. Calling this
504 /// method once after loading vectors reorders both the vector buffer and all
505 /// adjacency lists so nodes traversed together during search are close in
506 /// memory, reducing L2/L3 cache misses by 15–30% on collections with ≥ 1 000
507 /// vectors (issue #377).
508 ///
509 /// Also builds a PDX block-columnar layout for SIMD-parallel distance
510 /// computation when the columnar search path is enabled.
511 ///
512 /// # When to call
513 ///
514 /// After [`Self::upsert`] bulk-loading for a new collection, before the
515 /// collection is opened for queries. No-op for collections with fewer than
516 /// 1 000 vectors.
517 ///
518 /// # Errors
519 ///
520 /// Returns an error if vector storage reordering fails.
521 pub fn reorder_for_locality(&self) -> Result<()> {
522 self.inner.reorder_for_locality()
523 }
524}