use tracing::{debug, warn};
use crate::bridge::envelope::{ErrorCode, Response};
use crate::data::executor::core_loop::CoreLoop;
use crate::data::executor::task::ExecutionTask;
use crate::engine::vector::collection::VectorCollection;
fn build_search_hit(
id: u32,
distance: f32,
doc_id: Option<&str>,
) -> super::super::response_codec::VectorSearchHit {
super::super::response_codec::VectorSearchHit {
id,
distance,
doc_id: doc_id.map(String::from),
}
}
fn resolve_doc_id(collection: Option<&VectorCollection>, vector_id: u32) -> Option<&str> {
collection.and_then(|c| c.get_doc_id(vector_id))
}
fn encode_hits_response(
core: &CoreLoop,
task: &ExecutionTask,
hits: &Vec<super::super::response_codec::VectorSearchHit>,
) -> Response {
match super::super::response_codec::encode(hits) {
Ok(payload) => core.response_with_payload(task, payload),
Err(e) => {
warn!(core = core.core_id, error = %e, "vector search serialization failed");
core.response_error(
task,
ErrorCode::Internal {
detail: e.to_string(),
},
)
}
}
}
pub(in crate::data::executor) struct VectorSearchParams<'a> {
pub task: &'a ExecutionTask,
pub tid: u32,
pub collection: &'a str,
pub query_vector: &'a [f32],
pub top_k: usize,
pub ef_search: usize,
pub filter_bitmap: Option<&'a std::sync::Arc<[u8]>>,
pub field_name: &'a str,
pub rls_filters: &'a [u8],
}
pub(in crate::data::executor) struct VectorMultiSearchParams<'a> {
pub task: &'a ExecutionTask,
pub tid: u32,
pub collection: &'a str,
pub query_vector: &'a [f32],
pub top_k: usize,
pub ef_search: usize,
pub filter_bitmap: Option<&'a std::sync::Arc<[u8]>>,
pub rls_filters: &'a [u8],
}
impl CoreLoop {
pub(in crate::data::executor) fn execute_vector_search(
&self,
params: VectorSearchParams<'_>,
) -> Response {
let VectorSearchParams {
task,
tid,
collection,
query_vector,
top_k,
ef_search,
filter_bitmap,
field_name,
rls_filters,
} = params;
debug!(core = self.core_id, %collection, top_k, ef_search, "vector search");
let index_key = CoreLoop::vector_index_key(tid, collection, field_name);
if let Some(ivf) = self.ivf_indexes.get(&index_key) {
return self.search_ivf(task, &index_key, ivf, query_vector, top_k, filter_bitmap);
}
let Some(collection_ref) = self.vector_collections.get(&index_key) else {
return self.response_error(task, ErrorCode::NotFound);
};
if collection_ref.is_empty() {
return self.response_with_payload(task, b"[]".to_vec());
}
let fetch_k = if rls_filters.is_empty() {
top_k
} else {
top_k.saturating_mul(2).max(20)
};
let ef = effective_ef(ef_search, fetch_k);
let results = match filter_bitmap {
Some(bitmap_bytes) => {
collection_ref.search_with_bitmap_bytes(query_vector, fetch_k, ef, bitmap_bytes)
}
None => collection_ref.search(query_vector, fetch_k, ef),
};
let hits: Vec<_> = if rls_filters.is_empty() {
results
.iter()
.map(|r| build_search_hit(r.id, r.distance, collection_ref.get_doc_id(r.id)))
.collect()
} else {
results
.iter()
.filter(|r| {
let doc_id_str = match collection_ref.get_doc_id(r.id) {
Some(id) if !id.is_empty() => id,
_ => return false,
};
match self.sparse.get(tid, collection, doc_id_str) {
Ok(Some(bytes)) => {
super::rls_eval::rls_check_msgpack_bytes(rls_filters, &bytes)
}
_ => false,
}
})
.take(top_k)
.map(|r| build_search_hit(r.id, r.distance, collection_ref.get_doc_id(r.id)))
.collect()
};
if let Some(ref m) = self.metrics {
m.record_vector_search(0);
m.record_query_by_engine("vector");
}
encode_hits_response(self, task, &hits)
}
fn search_ivf(
&self,
task: &ExecutionTask,
index_key: &str,
ivf: &crate::engine::vector::ivf::IvfPqIndex,
query_vector: &[f32],
top_k: usize,
filter_bitmap: Option<&std::sync::Arc<[u8]>>,
) -> Response {
if ivf.is_empty() {
return self.response_with_payload(task, b"[]".to_vec());
}
let fetch_k = if filter_bitmap.is_some() {
top_k * self.query_tuning.bitmap_over_fetch_factor
} else {
top_k
};
let results = ivf.search(query_vector, fetch_k);
let doc_id_source = self.vector_collections.get(index_key);
let mut hits: Vec<_> = results
.iter()
.map(|r| build_search_hit(r.id, r.distance, resolve_doc_id(doc_id_source, r.id)))
.collect();
if let Some(bitmap_bytes) = filter_bitmap {
if let Ok(bm) = crate::query::bitmap::deserialize(bitmap_bytes) {
hits.retain(|h| bm.contains(h.id));
}
hits.truncate(top_k);
}
if let Some(ref m) = self.metrics {
m.record_vector_search(0);
m.record_query_by_engine("vector");
}
encode_hits_response(self, task, &hits)
}
pub(in crate::data::executor) fn execute_vector_multi_search(
&self,
params: VectorMultiSearchParams<'_>,
) -> Response {
let VectorMultiSearchParams {
task,
tid,
collection,
query_vector,
top_k,
ef_search,
filter_bitmap,
rls_filters,
} = params;
debug!(core = self.core_id, %collection, top_k, "vector multi-search");
let prefix = format!("{tid}:{collection}:");
let plain_key = CoreLoop::vector_index_key(tid, collection, "");
let mut all_results: Vec<Vec<crate::engine::vector::hnsw::SearchResult>> = Vec::new();
for (key, coll) in &self.vector_collections {
if key == &plain_key || key.starts_with(&prefix) {
if coll.is_empty() || coll.dim() != query_vector.len() {
continue;
}
let ef = effective_ef(ef_search, top_k);
let results = match filter_bitmap {
Some(bm) => coll.search_with_bitmap_bytes(query_vector, top_k, ef, bm),
None => coll.search(query_vector, top_k, ef),
};
all_results.push(results);
}
}
if all_results.is_empty() {
return self.response_error(task, ErrorCode::NotFound);
}
if all_results.len() == 1 {
let Some(results) = all_results.into_iter().next() else {
return self.response_error(task, ErrorCode::NotFound);
};
let doc_source = self.vector_collections.get(&plain_key);
let hits: Vec<_> = results
.iter()
.filter(|r| {
if rls_filters.is_empty() {
return true;
}
let doc_id_str = resolve_doc_id(doc_source, r.id).unwrap_or("");
if doc_id_str.is_empty() {
return false;
}
match self.sparse.get(tid, collection, doc_id_str) {
Ok(Some(bytes)) => {
super::rls_eval::rls_check_msgpack_bytes(rls_filters, &bytes)
}
_ => false,
}
})
.take(top_k)
.map(|r| build_search_hit(r.id, r.distance, resolve_doc_id(doc_source, r.id)))
.collect();
if let Some(ref m) = self.metrics {
m.record_vector_search(0);
m.record_query_by_engine("vector");
}
return encode_hits_response(self, task, &hits);
}
use crate::query::fusion::{RankedResult, reciprocal_rank_fusion};
let ranked_lists: Vec<Vec<RankedResult>> = all_results
.iter()
.map(|results| {
results
.iter()
.enumerate()
.map(|(rank, r)| RankedResult {
document_id: r.id.to_string(),
rank,
score: r.distance,
source: "vector",
})
.collect()
})
.collect();
let fused = reciprocal_rank_fusion(&ranked_lists, None, top_k);
let hits: Vec<_> = fused
.iter()
.filter_map(|f| {
let id: u32 = f.document_id.parse().ok()?;
let doc_id = self
.vector_collections
.get(&plain_key)
.and_then(|c| c.get_doc_id(id))
.or_else(|| {
self.vector_collections
.iter()
.filter(|(k, _)| *k == &plain_key || k.starts_with(&prefix))
.find_map(|(_, c)| c.get_doc_id(id))
});
if !rls_filters.is_empty() {
let doc_id_str = doc_id.unwrap_or("");
if doc_id_str.is_empty() {
return None;
}
match self.sparse.get(tid, collection, doc_id_str) {
Ok(Some(bytes))
if super::rls_eval::rls_check_msgpack_bytes(rls_filters, &bytes) => {}
_ => return None,
}
}
Some(build_search_hit(id, f.rrf_score as f32, doc_id))
})
.collect();
if let Some(ref m) = self.metrics {
m.record_vector_search(0);
m.record_query_by_engine("vector");
}
encode_hits_response(self, task, &hits)
}
}
fn effective_ef(ef_search: usize, top_k: usize) -> usize {
if ef_search > 0 {
ef_search.max(top_k)
} else {
top_k.saturating_mul(4).max(64)
}
}