Skip to main content

velesdb_server/handlers/search/
batch.rs

1//! Batch search handler: multiple vector queries in a single request.
2
3use axum::{
4    extract::{Path, State},
5    http::StatusCode,
6    response::IntoResponse,
7    Json,
8};
9use std::sync::Arc;
10
11use crate::types::{BatchSearchRequest, BatchSearchResponse, ErrorResponse, SearchResponse};
12use crate::AppState;
13
14use super::pipeline::{
15    actionable_search_error, build_search_response, record_circuit_breaker,
16    validate_query_dimension,
17};
18use crate::handlers::helpers::{
19    apply_pre_check, extract_client_id, get_vector_collection_or_404, notify_query_timing,
20};
21
22/// Batch search for multiple vectors.
23#[utoipa::path(
24    post,
25    path = "/collections/{name}/search/batch",
26    tag = "search",
27    params(
28        ("name" = String, Path, description = "Collection name")
29    ),
30    request_body = BatchSearchRequest,
31    responses(
32        (status = 200, description = "Batch search results", body = BatchSearchResponse),
33        (status = 404, description = "Collection not found", body = ErrorResponse),
34        (status = 400, description = "Invalid request", body = ErrorResponse)
35    )
36)]
37#[allow(clippy::unused_async)]
38pub async fn batch_search(
39    State(state): State<Arc<AppState>>,
40    headers: axum::http::HeaderMap,
41    Path(name): Path<String>,
42    Json(req): Json<BatchSearchRequest>,
43) -> impl IntoResponse {
44    let start = std::time::Instant::now();
45    state.onboarding_metrics.record_search_request();
46
47    let collection = match get_vector_collection_or_404(&state, &name) {
48        Ok(c) => c,
49        Err(resp) => return resp,
50    };
51
52    // Record query type only after confirming the collection exists, so
53    // 404s do not inflate queries_total or vector_queries.
54    state.operational_metrics.record_vector_query();
55
56    let client_id = extract_client_id(&headers);
57    if let Err(resp) = apply_pre_check(collection.guard_rails(), &client_id) {
58        state.operational_metrics.inc_rate_limited();
59        return resp;
60    }
61
62    if let Err(resp) = validate_batch_dimensions(&state, &name, &collection, &req) {
63        state.operational_metrics.inc_errors();
64        return resp;
65    }
66
67    let filters = match parse_batch_filters(&state, &req) {
68        Ok(f) => f,
69        Err(resp) => {
70            state.operational_metrics.inc_errors();
71            return resp;
72        }
73    };
74
75    let queries: Vec<&[f32]> = req.searches.iter().map(|s| s.vector.as_slice()).collect();
76    let max_top_k = req.searches.iter().map(|s| s.top_k).max().unwrap_or(10);
77
78    let batch_result = run_batch_search(&collection, &queries, max_top_k, &filters);
79    record_circuit_breaker(&collection, &batch_result);
80
81    let all_results = match batch_result {
82        Ok(batch_results) => build_batch_responses(&state, batch_results, &req),
83        Err(e) => {
84            state.operational_metrics.inc_errors();
85            return (StatusCode::BAD_REQUEST, Json(actionable_search_error(&e))).into_response();
86        }
87    };
88
89    finish_batch_search(&state, &name, start, all_results)
90}
91
92/// Dispatch a batch search to the throughput-optimized parallel kernel when no
93/// query carries a filter, falling back to the per-query filtered kernel
94/// otherwise. Both paths share the same HNSW traversal, so results are
95/// identical for the unfiltered case — only the rayon parallelism differs.
96fn run_batch_search(
97    collection: &velesdb_core::collection::VectorCollection,
98    queries: &[&[f32]],
99    max_top_k: usize,
100    filters: &[Option<velesdb_core::Filter>],
101) -> velesdb_core::Result<Vec<Vec<velesdb_core::SearchResult>>> {
102    if filters.iter().all(Option::is_none) {
103        collection.search_batch_parallel(queries, max_top_k)
104    } else {
105        collection.search_batch_with_filters(queries, max_top_k, filters)
106    }
107}
108
109/// Record timing metrics and build the final batch response envelope.
110fn finish_batch_search(
111    state: &AppState,
112    name: &str,
113    start: std::time::Instant,
114    results: Vec<SearchResponse>,
115) -> axum::response::Response {
116    let elapsed = start.elapsed();
117    let timing_ms = elapsed.as_secs_f64() * 1000.0;
118    notify_query_timing(state, name, start);
119    state
120        .query_duration_histogram
121        .observe(elapsed.as_secs_f64());
122
123    Json(BatchSearchResponse { results, timing_ms }).into_response()
124}
125
126/// Validate that every query vector in a batch request matches the collection dimension.
127#[allow(clippy::result_large_err)]
128fn validate_batch_dimensions(
129    state: &AppState,
130    name: &str,
131    collection: &velesdb_core::collection::VectorCollection,
132    req: &BatchSearchRequest,
133) -> Result<(), axum::response::Response> {
134    let expected_dimension = collection.config().dimension;
135    for (idx, search) in req.searches.iter().enumerate() {
136        if let Err(error) =
137            validate_query_dimension(state, name, expected_dimension, &search.vector)
138        {
139            return Err((
140                StatusCode::BAD_REQUEST,
141                Json(ErrorResponse {
142                    error: format!("Invalid query at index {idx}: {}", error.error),
143                    code: error.code.clone(),
144                }),
145            )
146                .into_response());
147        }
148    }
149    Ok(())
150}
151
152/// Parse filters from each search in a batch request.
153#[allow(clippy::result_large_err)]
154fn parse_batch_filters(
155    state: &AppState,
156    req: &BatchSearchRequest,
157) -> Result<Vec<Option<velesdb_core::Filter>>, axum::response::Response> {
158    let mut filters: Vec<Option<velesdb_core::Filter>> = Vec::with_capacity(req.searches.len());
159    for (idx, search) in req.searches.iter().enumerate() {
160        if let Some(filter_json) = &search.filter {
161            match serde_json::from_value(filter_json.clone()) {
162                Ok(filter) => filters.push(Some(filter)),
163                Err(e) => {
164                    state.onboarding_metrics.record_filter_parse_error();
165                    return Err((
166                        StatusCode::BAD_REQUEST,
167                        Json(ErrorResponse {
168                            error: format!(
169                                "Invalid filter at index {idx}: {e}. Hint: validate filter syntax and start with a broader query before reintroducing strict filters."
170                            ),
171                            code: None,
172                        }),
173                    )
174                        .into_response());
175                }
176            }
177        } else {
178            filters.push(None);
179        }
180    }
181    Ok(filters)
182}
183
184/// Convert batch search results into response objects, recording metrics for empty results.
185fn build_batch_responses(
186    state: &AppState,
187    batch_results: Vec<Vec<velesdb_core::SearchResult>>,
188    req: &BatchSearchRequest,
189) -> Vec<SearchResponse> {
190    let empty_count = batch_results
191        .iter()
192        .filter(|results| results.is_empty())
193        .count();
194    for _ in 0..empty_count {
195        state.onboarding_metrics.record_empty_search_results();
196    }
197    debug_assert_eq!(
198        batch_results.len(),
199        req.searches.len(),
200        "search_batch_with_filters must return one result-vec per query"
201    );
202    batch_results
203        .into_iter()
204        .zip(req.searches.iter())
205        .map(|(results, search)| {
206            let truncated: Vec<_> = results.into_iter().take(search.top_k).collect();
207            build_search_response(truncated)
208        })
209        .collect()
210}