velesdb_server/handlers/query/
mod.rs1pub mod aggregation;
4pub mod explain;
5pub(crate) mod velesql_helpers;
6
7pub use aggregation::__path_aggregate;
8pub use aggregation::aggregate;
9pub use explain::{__path_explain, explain};
10
11use axum::{extract::State, http::StatusCode, response::IntoResponse, Json};
12use std::sync::Arc;
13use velesdb_core::collection::search::query::projection;
14#[cfg(test)]
15use velesdb_core::velesql;
16use velesdb_core::velesql::{
17 DdlStatement, DmlStatement, IntrospectionStatement, Query, SelectColumns,
18};
19
20use crate::handlers::helpers::notify_query_timing;
21use crate::types::{
22 QueryRequest, QueryResponse, QueryResponseMeta, QueryType, VELESQL_CONTRACT_VERSION,
23};
24use crate::AppState;
25
26use aggregation::{execute_aggregation_query, is_aggregation_query};
27use explain::condition_has_vector_search;
28use velesql_helpers::{parse_and_validate, velesql_collection_not_found, velesql_error};
29
30fn requires_mutation_dispatch(parsed: &Query) -> bool {
34 parsed.is_ddl_query()
35 || parsed.is_introspection_query()
36 || parsed.is_admin_query()
37 || parsed.is_train()
38 || is_ast_routed_dml(parsed)
39}
40
41fn is_ast_routed_dml(parsed: &Query) -> bool {
49 matches!(
50 parsed.dml,
51 Some(
52 DmlStatement::InsertEdge(_)
53 | DmlStatement::Delete(_)
54 | DmlStatement::DeleteEdge(_)
55 | DmlStatement::SelectEdges(_)
56 | DmlStatement::InsertNode(_)
57 )
58 )
59}
60
61#[utoipa::path(
69 post,
70 path = "/query",
71 tag = "query",
72 request_body = QueryRequest,
73 responses(
74 (status = 200, description = "Query results", body = QueryResponse),
75 (status = 400, description = "Query syntax error", body = crate::types::QueryErrorResponse),
76 (status = 422, description = "Query validation/execution error", body = crate::types::VelesqlErrorResponse),
77 (status = 404, description = "Collection not found", body = crate::types::VelesqlErrorResponse)
78 )
79)]
80#[allow(clippy::unused_async)]
81pub async fn query(
82 State(state): State<Arc<AppState>>,
83 Json(req): Json<QueryRequest>,
84) -> impl IntoResponse {
85 let start = std::time::Instant::now();
86 state.operational_metrics.inc_queries();
87
88 let parsed = match parse_and_validate(&req.query) {
89 Ok(q) => q,
90 Err(resp) => {
91 state.operational_metrics.inc_errors();
92 return resp;
93 }
94 };
95
96 if requires_mutation_dispatch(&parsed) {
100 return execute_mutation_query(&state, &parsed, &req.params, start);
101 }
102
103 let collection_name = match resolve_collection_name(&parsed, &req) {
104 Ok(name) => name,
105 Err(resp) => {
106 state.operational_metrics.inc_errors();
107 return resp;
108 }
109 };
110
111 if is_aggregation_query(&parsed.select) {
113 return execute_aggregation_query(&state, &collection_name, &parsed, &req.params, start);
114 }
115
116 let results = match execute_standard_query(&state, &parsed, &collection_name, &req) {
117 Ok(r) => r,
118 Err(resp) => {
119 state.operational_metrics.inc_errors();
120 return resp;
121 }
122 };
123
124 build_query_response(
125 &state,
126 &collection_name,
127 start,
128 results,
129 &parsed.select.columns,
130 )
131}
132
133fn extract_mutation_collection_name(parsed: &Query) -> String {
151 extract_ddl_collection(parsed)
152 .or_else(|| extract_dml_collection(parsed))
153 .or_else(|| extract_introspection_collection(parsed))
154 .or_else(|| extract_admin_collection(parsed))
155 .or_else(|| parsed.train.as_ref().map(|train| train.collection.clone()))
156 .unwrap_or_else(|| "_system".to_string())
157}
158
159fn extract_introspection_collection(parsed: &Query) -> Option<String> {
161 parsed.introspection.as_ref().map(|intro| match intro {
162 IntrospectionStatement::DescribeCollection(d) => d.name.clone(),
163 IntrospectionStatement::ShowCollections | IntrospectionStatement::Explain(_) => {
164 "_system".to_string()
165 }
166 other => warn_unknown_velesql_variant("IntrospectionStatement", other),
167 })
168}
169
170fn extract_admin_collection(parsed: &Query) -> Option<String> {
172 parsed.admin.as_ref().map(|admin| match admin {
173 velesdb_core::velesql::AdminStatement::Flush(f) => f
174 .collection
175 .clone()
176 .unwrap_or_else(|| "_system".to_string()),
177 other => warn_unknown_velesql_variant("AdminStatement", other),
178 })
179}
180
181fn extract_ddl_collection(parsed: &Query) -> Option<String> {
183 parsed.ddl.as_ref().and_then(|ddl| match ddl {
184 DdlStatement::CreateCollection(s) => Some(s.name.clone()),
185 DdlStatement::DropCollection(s) => Some(s.name.clone()),
186 DdlStatement::CreateIndex(s) => Some(s.collection.clone()),
187 DdlStatement::DropIndex(s) => Some(s.collection.clone()),
188 DdlStatement::Analyze(s) => Some(s.collection.clone()),
189 DdlStatement::Truncate(s) => Some(s.collection.clone()),
190 DdlStatement::AlterCollection(s) => Some(s.collection.clone()),
191 other => {
192 warn_unknown_velesql_variant("DdlStatement", other);
193 None
194 }
195 })
196}
197
198fn extract_dml_collection(parsed: &Query) -> Option<String> {
200 parsed.dml.as_ref().and_then(|dml| match dml {
201 DmlStatement::Insert(s) | DmlStatement::Upsert(s) => Some(s.table.clone()),
202 DmlStatement::Update(s) => Some(s.table.clone()),
203 DmlStatement::InsertEdge(s) => Some(s.collection.clone()),
204 DmlStatement::Delete(s) => Some(s.table.clone()),
205 DmlStatement::DeleteEdge(s) => Some(s.collection.clone()),
206 DmlStatement::SelectEdges(s) => Some(s.collection.clone()),
207 DmlStatement::InsertNode(s) => Some(s.collection.clone()),
208 other => {
209 warn_unknown_velesql_variant("DmlStatement", other);
210 None
211 }
212 })
213}
214
215fn warn_unknown_velesql_variant<T: std::fmt::Debug>(kind: &'static str, variant: &T) -> String {
227 tracing::warn!(
228 target: "velesql.dispatch",
229 enum_kind = kind,
230 variant = ?variant,
231 "unknown VelesQL statement variant on telemetry extraction path; \
232 routing collection tag to _system — add the new variant to \
233 extract_mutation_collection_name in handlers/query/mod.rs"
234 );
235 "_system".to_string()
236}
237
238fn execute_mutation_query(
250 state: &Arc<AppState>,
251 parsed: &Query,
252 params: &std::collections::HashMap<String, serde_json::Value>,
253 start: std::time::Instant,
254) -> axum::response::Response {
255 match state.db.execute_query(parsed, params) {
256 Ok(results) => {
257 let coll_name = extract_mutation_collection_name(parsed);
258 build_query_response(state, &coll_name, start, results, &parsed.select.columns)
259 }
260 Err(e) => {
261 state.operational_metrics.inc_errors();
262 velesql_error(
263 StatusCode::UNPROCESSABLE_ENTITY,
264 "VELESQL_MUTATION_ERROR",
265 &e.to_string(),
266 "Check collection name, statement syntax, and target existence",
267 None,
268 )
269 }
270 }
271}
272
273#[allow(clippy::result_large_err)]
275fn resolve_collection_name(
276 parsed: &Query,
277 req: &QueryRequest,
278) -> Result<String, axum::response::Response> {
279 if parsed.is_match_query() {
280 req.collection
281 .as_ref()
282 .filter(|name| !name.is_empty())
283 .cloned()
284 .ok_or_else(|| {
285 velesql_error(
286 StatusCode::UNPROCESSABLE_ENTITY,
287 "VELESQL_MISSING_COLLECTION",
288 "MATCH query via /query requires `collection` in request body",
289 "Add `collection` to the /query JSON body or use /collections/{name}/match",
290 Some(serde_json::json!({
291 "field": "collection",
292 "endpoint": "/query",
293 "query_type": "MATCH"
294 })),
295 )
296 })
297 } else {
298 Ok(parsed.select.from.clone())
299 }
300}
301
302#[allow(clippy::result_large_err)]
304fn execute_standard_query(
305 state: &Arc<AppState>,
306 parsed: &Query,
307 collection_name: &str,
308 req: &QueryRequest,
309) -> Result<Vec<velesdb_core::SearchResult>, axum::response::Response> {
310 let execute_result = if parsed.is_match_query() {
311 let mut params = req.params.clone();
312 params
313 .entry("_collection".to_string())
314 .or_insert_with(|| serde_json::json!(collection_name));
315 state.db.execute_query(parsed, ¶ms)
316 } else {
317 state.db.execute_query(parsed, &req.params)
318 };
319
320 execute_result.map_err(|e| match e {
321 velesdb_core::Error::CollectionNotFound(name) => velesql_collection_not_found(&name),
322 other => velesql_error(
323 StatusCode::UNPROCESSABLE_ENTITY,
324 "VELESQL_EXECUTION_ERROR",
325 &other.to_string(),
326 "Validate query semantics and parameter types against the target collection",
327 None,
328 ),
329 })
330}
331
332fn build_query_response(
334 state: &Arc<AppState>,
335 collection_name: &str,
336 start: std::time::Instant,
337 results: Vec<velesdb_core::SearchResult>,
338 select_columns: &SelectColumns,
339) -> axum::response::Response {
340 let elapsed = start.elapsed();
341 let timing_ms = elapsed.as_secs_f64() * 1000.0;
342 #[allow(clippy::cast_possible_truncation)]
343 let took_ms = timing_ms.round() as u64;
345 notify_query_timing(state, collection_name, start);
346 state
347 .query_duration_histogram
348 .observe(elapsed.as_secs_f64());
349 let projected = projection::project_results(&results, select_columns);
350 let rows_returned = projected.len();
351
352 Json(QueryResponse {
353 results: projected,
354 timing_ms,
355 took_ms,
356 rows_returned,
357 meta: QueryResponseMeta {
358 velesql_contract_version: VELESQL_CONTRACT_VERSION.to_string(),
359 count: rows_returned,
360 },
361 })
362 .into_response()
363}
364
365#[allow(dead_code)] pub fn detect_query_type(query: &Query) -> QueryType {
376 if query.is_ddl_query() {
377 return QueryType::Ddl;
378 }
379
380 if query.is_dml_query() {
381 return QueryType::Dml;
382 }
383
384 if query.is_match_query() {
385 return QueryType::Graph;
386 }
387
388 if is_aggregation_query(&query.select) {
389 return QueryType::Aggregation;
390 }
391
392 let has_vector = query
393 .select
394 .where_clause
395 .as_ref()
396 .map(condition_has_vector_search)
397 .unwrap_or(false);
398
399 if has_vector {
400 return QueryType::Search;
401 }
402
403 QueryType::Rows
404}
405
406#[cfg(test)]
407mod tests {
408 use super::*;
409
410 #[test]
411 fn test_detect_query_type_search() {
412 let parsed = velesql::Parser::parse(
413 "SELECT * FROM docs WHERE similarity(embedding, $v) > 0.8 LIMIT 10",
414 )
415 .unwrap();
416 assert_eq!(detect_query_type(&parsed), QueryType::Search);
417 }
418
419 #[test]
420 fn test_detect_query_type_aggregation() {
421 let parsed =
422 velesql::Parser::parse("SELECT category, COUNT(*) FROM products GROUP BY category")
423 .unwrap();
424 assert_eq!(detect_query_type(&parsed), QueryType::Aggregation);
425 }
426
427 #[test]
428 fn test_detect_query_type_rows() {
429 let parsed =
430 velesql::Parser::parse("SELECT name, price FROM products WHERE price > 100").unwrap();
431 assert_eq!(detect_query_type(&parsed), QueryType::Rows);
432 }
433
434 #[test]
435 fn test_detect_query_type_graph() {
436 let parsed =
437 velesql::Parser::parse("MATCH (n:Person)-[:KNOWS]->(m) RETURN n.name, m.name LIMIT 10")
438 .unwrap();
439 assert_eq!(detect_query_type(&parsed), QueryType::Graph);
440 }
441
442 #[test]
443 fn test_detect_query_type_hybrid_vector_aggregation() {
444 let parsed = velesql::Parser::parse(
446 "SELECT category, COUNT(*) FROM docs WHERE similarity(embedding, $v) > 0.7 GROUP BY category",
447 )
448 .unwrap();
449 assert_eq!(detect_query_type(&parsed), QueryType::Aggregation);
450 }
451
452 #[test]
453 fn test_detect_query_type_ddl_create() {
454 let parsed =
455 velesql::Parser::parse("CREATE COLLECTION docs (dimension = 768, metric = 'cosine');")
456 .unwrap();
457 assert_eq!(detect_query_type(&parsed), QueryType::Ddl);
458 }
459
460 #[test]
461 fn test_detect_query_type_ddl_drop() {
462 let parsed = velesql::Parser::parse("DROP COLLECTION docs;").unwrap();
463 assert_eq!(detect_query_type(&parsed), QueryType::Ddl);
464 }
465
466 #[test]
467 fn test_detect_query_type_dml_insert_edge() {
468 let parsed = velesql::Parser::parse(
469 "INSERT EDGE INTO kg (source = 1, target = 2, label = 'KNOWS');",
470 )
471 .unwrap();
472 assert_eq!(detect_query_type(&parsed), QueryType::Dml);
473 }
474
475 #[test]
476 fn test_detect_query_type_dml_delete() {
477 let parsed = velesql::Parser::parse("DELETE FROM docs WHERE id = 1;").unwrap();
478 assert_eq!(detect_query_type(&parsed), QueryType::Dml);
479 }
480}