velesdb_server/handlers/search/
batch.rs1use axum::{
4 extract::{Path, State},
5 http::StatusCode,
6 response::IntoResponse,
7 Json,
8};
9use std::sync::Arc;
10
11use crate::types::{BatchSearchRequest, BatchSearchResponse, ErrorResponse, SearchResponse};
12use crate::AppState;
13
14use super::pipeline::{
15 actionable_search_error, build_search_response, record_circuit_breaker,
16 validate_query_dimension,
17};
18use crate::handlers::helpers::{
19 apply_pre_check, extract_client_id, get_vector_collection_or_404, notify_query_timing,
20};
21
22#[utoipa::path(
24 post,
25 path = "/collections/{name}/search/batch",
26 tag = "search",
27 params(
28 ("name" = String, Path, description = "Collection name")
29 ),
30 request_body = BatchSearchRequest,
31 responses(
32 (status = 200, description = "Batch search results", body = BatchSearchResponse),
33 (status = 404, description = "Collection not found", body = ErrorResponse),
34 (status = 400, description = "Invalid request", body = ErrorResponse)
35 )
36)]
37#[allow(clippy::unused_async)]
38pub async fn batch_search(
39 State(state): State<Arc<AppState>>,
40 headers: axum::http::HeaderMap,
41 Path(name): Path<String>,
42 Json(req): Json<BatchSearchRequest>,
43) -> impl IntoResponse {
44 let start = std::time::Instant::now();
45 state.onboarding_metrics.record_search_request();
46
47 let collection = match get_vector_collection_or_404(&state, &name) {
48 Ok(c) => c,
49 Err(resp) => return resp,
50 };
51
52 state.operational_metrics.record_vector_query();
55
56 let client_id = extract_client_id(&headers);
57 if let Err(resp) = apply_pre_check(collection.guard_rails(), &client_id) {
58 state.operational_metrics.inc_rate_limited();
59 return resp;
60 }
61
62 if let Err(resp) = validate_batch_dimensions(&state, &name, &collection, &req) {
63 state.operational_metrics.inc_errors();
64 return resp;
65 }
66
67 let filters = match parse_batch_filters(&state, &req) {
68 Ok(f) => f,
69 Err(resp) => {
70 state.operational_metrics.inc_errors();
71 return resp;
72 }
73 };
74
75 let queries: Vec<&[f32]> = req.searches.iter().map(|s| s.vector.as_slice()).collect();
76 let max_top_k = req.searches.iter().map(|s| s.top_k).max().unwrap_or(10);
77
78 let batch_result = run_batch_search(&collection, &queries, max_top_k, &filters);
79 record_circuit_breaker(&collection, &batch_result);
80
81 let all_results = match batch_result {
82 Ok(batch_results) => build_batch_responses(&state, batch_results, &req),
83 Err(e) => {
84 state.operational_metrics.inc_errors();
85 return (StatusCode::BAD_REQUEST, Json(actionable_search_error(&e))).into_response();
86 }
87 };
88
89 finish_batch_search(&state, &name, start, all_results)
90}
91
92fn run_batch_search(
97 collection: &velesdb_core::collection::VectorCollection,
98 queries: &[&[f32]],
99 max_top_k: usize,
100 filters: &[Option<velesdb_core::Filter>],
101) -> velesdb_core::Result<Vec<Vec<velesdb_core::SearchResult>>> {
102 if filters.iter().all(Option::is_none) {
103 collection.search_batch_parallel(queries, max_top_k)
104 } else {
105 collection.search_batch_with_filters(queries, max_top_k, filters)
106 }
107}
108
109fn finish_batch_search(
111 state: &AppState,
112 name: &str,
113 start: std::time::Instant,
114 results: Vec<SearchResponse>,
115) -> axum::response::Response {
116 let elapsed = start.elapsed();
117 let timing_ms = elapsed.as_secs_f64() * 1000.0;
118 notify_query_timing(state, name, start);
119 state
120 .query_duration_histogram
121 .observe(elapsed.as_secs_f64());
122
123 Json(BatchSearchResponse { results, timing_ms }).into_response()
124}
125
126#[allow(clippy::result_large_err)]
128fn validate_batch_dimensions(
129 state: &AppState,
130 name: &str,
131 collection: &velesdb_core::collection::VectorCollection,
132 req: &BatchSearchRequest,
133) -> Result<(), axum::response::Response> {
134 let expected_dimension = collection.config().dimension;
135 for (idx, search) in req.searches.iter().enumerate() {
136 if let Err(error) =
137 validate_query_dimension(state, name, expected_dimension, &search.vector)
138 {
139 return Err((
140 StatusCode::BAD_REQUEST,
141 Json(ErrorResponse {
142 error: format!("Invalid query at index {idx}: {}", error.error),
143 code: error.code.clone(),
144 }),
145 )
146 .into_response());
147 }
148 }
149 Ok(())
150}
151
152#[allow(clippy::result_large_err)]
154fn parse_batch_filters(
155 state: &AppState,
156 req: &BatchSearchRequest,
157) -> Result<Vec<Option<velesdb_core::Filter>>, axum::response::Response> {
158 let mut filters: Vec<Option<velesdb_core::Filter>> = Vec::with_capacity(req.searches.len());
159 for (idx, search) in req.searches.iter().enumerate() {
160 if let Some(filter_json) = &search.filter {
161 match serde_json::from_value(filter_json.clone()) {
162 Ok(filter) => filters.push(Some(filter)),
163 Err(e) => {
164 state.onboarding_metrics.record_filter_parse_error();
165 return Err((
166 StatusCode::BAD_REQUEST,
167 Json(ErrorResponse {
168 error: format!(
169 "Invalid filter at index {idx}: {e}. Hint: validate filter syntax and start with a broader query before reintroducing strict filters."
170 ),
171 code: None,
172 }),
173 )
174 .into_response());
175 }
176 }
177 } else {
178 filters.push(None);
179 }
180 }
181 Ok(filters)
182}
183
184fn build_batch_responses(
186 state: &AppState,
187 batch_results: Vec<Vec<velesdb_core::SearchResult>>,
188 req: &BatchSearchRequest,
189) -> Vec<SearchResponse> {
190 let empty_count = batch_results
191 .iter()
192 .filter(|results| results.is_empty())
193 .count();
194 for _ in 0..empty_count {
195 state.onboarding_metrics.record_empty_search_results();
196 }
197 debug_assert_eq!(
198 batch_results.len(),
199 req.searches.len(),
200 "search_batch_with_filters must return one result-vec per query"
201 );
202 batch_results
203 .into_iter()
204 .zip(req.searches.iter())
205 .map(|(results, search)| {
206 let truncated: Vec<_> = results.into_iter().take(search.top_k).collect();
207 build_search_response(truncated)
208 })
209 .collect()
210}