1pub mod aggregation;
4pub mod explain;
5pub(crate) mod velesql_helpers;
6
7pub use aggregation::__path_aggregate;
8pub use aggregation::aggregate;
9pub use explain::{__path_explain, explain};
10
11use axum::{extract::State, http::StatusCode, response::IntoResponse, Json};
12use std::sync::Arc;
13use velesdb_core::collection::search::query::projection;
14#[cfg(test)]
15use velesdb_core::velesql;
16use velesdb_core::velesql::{
17 DdlStatement, DmlStatement, IntrospectionStatement, Query, SelectColumns,
18};
19
20use crate::handlers::helpers::notify_query_timing;
21use crate::types::{
22 QueryRequest, QueryResponse, QueryResponseMeta, QueryType, VELESQL_CONTRACT_VERSION,
23};
24use crate::AppState;
25
26use aggregation::{execute_aggregation_query, is_aggregation_query};
27use explain::condition_has_vector_search;
28use velesql_helpers::{parse_and_validate, velesql_collection_not_found, velesql_error};
29
30fn requires_mutation_dispatch(parsed: &Query) -> bool {
34 parsed.is_ddl_query()
35 || parsed.is_introspection_query()
36 || parsed.is_admin_query()
37 || parsed.is_train()
38 || is_ast_routed_dml(parsed)
39}
40
41fn is_ast_routed_dml(parsed: &Query) -> bool {
49 matches!(
50 parsed.dml,
51 Some(
52 DmlStatement::InsertEdge(_)
53 | DmlStatement::Delete(_)
54 | DmlStatement::DeleteEdge(_)
55 | DmlStatement::SelectEdges(_)
56 | DmlStatement::InsertNode(_)
57 )
58 )
59}
60
61#[utoipa::path(
69 post,
70 path = "/query",
71 tag = "query",
72 request_body = QueryRequest,
73 responses(
74 (status = 200, description = "Query results", body = QueryResponse),
75 (status = 400, description = "Query syntax error", body = crate::types::QueryErrorResponse),
76 (status = 422, description = "Query validation/execution error", body = crate::types::VelesqlErrorResponse),
77 (status = 404, description = "Collection not found", body = crate::types::VelesqlErrorResponse)
78 )
79)]
80#[allow(clippy::unused_async)]
81pub async fn query(
82 State(state): State<Arc<AppState>>,
83 Json(req): Json<QueryRequest>,
84) -> impl IntoResponse {
85 let start = std::time::Instant::now();
86 state.operational_metrics.inc_queries();
87
88 let parsed = match parse_and_validate(&req.query) {
89 Ok(q) => q,
90 Err(resp) => {
91 state.operational_metrics.inc_errors();
92 return resp;
93 }
94 };
95
96 if requires_mutation_dispatch(&parsed) {
100 return execute_mutation_query(&state, &parsed, &req.params, start);
101 }
102
103 let collection_name = match resolve_collection_name(&parsed, &req) {
104 Ok(name) => name,
105 Err(resp) => {
106 state.operational_metrics.inc_errors();
107 return resp;
108 }
109 };
110
111 if is_aggregation_query(&parsed.select) {
113 return execute_aggregation_query(&state, &collection_name, &parsed, &req.params, start);
114 }
115
116 let results = match execute_standard_query(&state, &parsed, &collection_name, &req) {
117 Ok(r) => r,
118 Err(resp) => {
119 state.operational_metrics.inc_errors();
120 return resp;
121 }
122 };
123
124 build_query_response(
125 &state,
126 &collection_name,
127 start,
128 results,
129 &parsed.select.columns,
130 )
131}
132
133fn extract_mutation_collection_name(parsed: &Query) -> String {
151 if let Some(name) = extract_ddl_collection(parsed) {
152 return name;
153 }
154 if let Some(name) = extract_dml_collection(parsed) {
155 return name;
156 }
157 if let Some(ref intro) = parsed.introspection {
158 return match intro {
159 IntrospectionStatement::DescribeCollection(d) => d.name.clone(),
160 IntrospectionStatement::ShowCollections | IntrospectionStatement::Explain(_) => {
161 "_system".to_string()
162 }
163 other => warn_unknown_velesql_variant("IntrospectionStatement", other),
164 };
165 }
166 if let Some(ref admin) = parsed.admin {
167 return match admin {
168 velesdb_core::velesql::AdminStatement::Flush(f) => f
169 .collection
170 .clone()
171 .unwrap_or_else(|| "_system".to_string()),
172 other => warn_unknown_velesql_variant("AdminStatement", other),
173 };
174 }
175 if let Some(ref train) = parsed.train {
176 return train.collection.clone();
177 }
178 "_system".to_string()
179}
180
181fn extract_ddl_collection(parsed: &Query) -> Option<String> {
183 parsed.ddl.as_ref().and_then(|ddl| match ddl {
184 DdlStatement::CreateCollection(s) => Some(s.name.clone()),
185 DdlStatement::DropCollection(s) => Some(s.name.clone()),
186 DdlStatement::CreateIndex(s) => Some(s.collection.clone()),
187 DdlStatement::DropIndex(s) => Some(s.collection.clone()),
188 DdlStatement::Analyze(s) => Some(s.collection.clone()),
189 DdlStatement::Truncate(s) => Some(s.collection.clone()),
190 DdlStatement::AlterCollection(s) => Some(s.collection.clone()),
191 other => {
192 warn_unknown_velesql_variant("DdlStatement", other);
193 None
194 }
195 })
196}
197
198fn extract_dml_collection(parsed: &Query) -> Option<String> {
200 parsed.dml.as_ref().and_then(|dml| match dml {
201 DmlStatement::Insert(s) | DmlStatement::Upsert(s) => Some(s.table.clone()),
202 DmlStatement::Update(s) => Some(s.table.clone()),
203 DmlStatement::InsertEdge(s) => Some(s.collection.clone()),
204 DmlStatement::Delete(s) => Some(s.table.clone()),
205 DmlStatement::DeleteEdge(s) => Some(s.collection.clone()),
206 DmlStatement::SelectEdges(s) => Some(s.collection.clone()),
207 DmlStatement::InsertNode(s) => Some(s.collection.clone()),
208 other => {
209 warn_unknown_velesql_variant("DmlStatement", other);
210 None
211 }
212 })
213}
214
215fn warn_unknown_velesql_variant<T: std::fmt::Debug>(kind: &'static str, variant: &T) -> String {
227 tracing::warn!(
228 target: "velesql.dispatch",
229 enum_kind = kind,
230 variant = ?variant,
231 "unknown VelesQL statement variant on telemetry extraction path; \
232 routing collection tag to _system — add the new variant to \
233 extract_mutation_collection_name in handlers/query/mod.rs"
234 );
235 "_system".to_string()
236}
237
238fn execute_mutation_query(
250 state: &Arc<AppState>,
251 parsed: &Query,
252 params: &std::collections::HashMap<String, serde_json::Value>,
253 start: std::time::Instant,
254) -> axum::response::Response {
255 match state.db.execute_query(parsed, params) {
256 Ok(results) => {
257 let coll_name = extract_mutation_collection_name(parsed);
258 notify_query_timing(state, &coll_name, start);
259 let elapsed = start.elapsed();
260 let timing_ms = elapsed.as_secs_f64() * 1000.0;
261 #[allow(clippy::cast_possible_truncation)]
262 let took_ms = timing_ms.round() as u64;
264 state
265 .query_duration_histogram
266 .observe(elapsed.as_secs_f64());
267 let projected = projection::project_results(&results, &parsed.select.columns);
268 let rows_returned = projected.len();
269 Json(QueryResponse {
270 results: projected,
271 timing_ms,
272 took_ms,
273 rows_returned,
274 meta: QueryResponseMeta {
275 velesql_contract_version: VELESQL_CONTRACT_VERSION.to_string(),
276 count: rows_returned,
277 },
278 })
279 .into_response()
280 }
281 Err(e) => {
282 state.operational_metrics.inc_errors();
283 velesql_error(
284 StatusCode::UNPROCESSABLE_ENTITY,
285 "VELESQL_MUTATION_ERROR",
286 &e.to_string(),
287 "Check collection name, statement syntax, and target existence",
288 None,
289 )
290 }
291 }
292}
293
294#[allow(clippy::result_large_err)]
296fn resolve_collection_name(
297 parsed: &Query,
298 req: &QueryRequest,
299) -> Result<String, axum::response::Response> {
300 if parsed.is_match_query() {
301 req.collection
302 .as_ref()
303 .filter(|name| !name.is_empty())
304 .cloned()
305 .ok_or_else(|| {
306 velesql_error(
307 StatusCode::UNPROCESSABLE_ENTITY,
308 "VELESQL_MISSING_COLLECTION",
309 "MATCH query via /query requires `collection` in request body",
310 "Add `collection` to the /query JSON body or use /collections/{name}/match",
311 Some(serde_json::json!({
312 "field": "collection",
313 "endpoint": "/query",
314 "query_type": "MATCH"
315 })),
316 )
317 })
318 } else {
319 Ok(parsed.select.from.clone())
320 }
321}
322
323#[allow(clippy::result_large_err)]
325fn execute_standard_query(
326 state: &Arc<AppState>,
327 parsed: &Query,
328 collection_name: &str,
329 req: &QueryRequest,
330) -> Result<Vec<velesdb_core::SearchResult>, axum::response::Response> {
331 let execute_result = if parsed.is_match_query() {
332 if let Some(vc) = state.db.get_vector_collection(collection_name) {
335 vc.execute_query(parsed, &req.params)
336 } else if let Some(gc) = state.db.get_graph_collection(collection_name) {
337 gc.execute_query(parsed, &req.params)
338 } else if let Some(mc) = state.db.get_metadata_collection(collection_name) {
339 mc.execute_query(parsed, &req.params)
340 } else {
341 Err(velesdb_core::Error::CollectionNotFound(
342 collection_name.to_string(),
343 ))
344 }
345 } else {
346 state.db.execute_query(parsed, &req.params)
347 };
348
349 execute_result.map_err(|e| match e {
350 velesdb_core::Error::CollectionNotFound(name) => velesql_collection_not_found(&name),
351 other => velesql_error(
352 StatusCode::UNPROCESSABLE_ENTITY,
353 "VELESQL_EXECUTION_ERROR",
354 &other.to_string(),
355 "Validate query semantics and parameter types against the target collection",
356 None,
357 ),
358 })
359}
360
361fn build_query_response(
363 state: &Arc<AppState>,
364 collection_name: &str,
365 start: std::time::Instant,
366 results: Vec<velesdb_core::SearchResult>,
367 select_columns: &SelectColumns,
368) -> axum::response::Response {
369 let elapsed = start.elapsed();
370 let timing_ms = elapsed.as_secs_f64() * 1000.0;
371 #[allow(clippy::cast_possible_truncation)]
372 let took_ms = timing_ms.round() as u64;
374 notify_query_timing(state, collection_name, start);
375 state
376 .query_duration_histogram
377 .observe(elapsed.as_secs_f64());
378 let projected = projection::project_results(&results, select_columns);
379 let rows_returned = projected.len();
380
381 Json(QueryResponse {
382 results: projected,
383 timing_ms,
384 took_ms,
385 rows_returned,
386 meta: QueryResponseMeta {
387 velesql_contract_version: VELESQL_CONTRACT_VERSION.to_string(),
388 count: rows_returned,
389 },
390 })
391 .into_response()
392}
393
394#[allow(dead_code)] pub fn detect_query_type(query: &Query) -> QueryType {
405 if query.is_ddl_query() {
406 return QueryType::Ddl;
407 }
408
409 if query.is_dml_query() {
410 return QueryType::Dml;
411 }
412
413 if query.is_match_query() {
414 return QueryType::Graph;
415 }
416
417 if is_aggregation_query(&query.select) {
418 return QueryType::Aggregation;
419 }
420
421 let has_vector = query
422 .select
423 .where_clause
424 .as_ref()
425 .map(condition_has_vector_search)
426 .unwrap_or(false);
427
428 if has_vector {
429 return QueryType::Search;
430 }
431
432 QueryType::Rows
433}
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438
439 #[test]
440 fn test_detect_query_type_search() {
441 let parsed = velesql::Parser::parse(
442 "SELECT * FROM docs WHERE similarity(embedding, $v) > 0.8 LIMIT 10",
443 )
444 .unwrap();
445 assert_eq!(detect_query_type(&parsed), QueryType::Search);
446 }
447
448 #[test]
449 fn test_detect_query_type_aggregation() {
450 let parsed =
451 velesql::Parser::parse("SELECT category, COUNT(*) FROM products GROUP BY category")
452 .unwrap();
453 assert_eq!(detect_query_type(&parsed), QueryType::Aggregation);
454 }
455
456 #[test]
457 fn test_detect_query_type_rows() {
458 let parsed =
459 velesql::Parser::parse("SELECT name, price FROM products WHERE price > 100").unwrap();
460 assert_eq!(detect_query_type(&parsed), QueryType::Rows);
461 }
462
463 #[test]
464 fn test_detect_query_type_graph() {
465 let parsed =
466 velesql::Parser::parse("MATCH (n:Person)-[:KNOWS]->(m) RETURN n.name, m.name LIMIT 10")
467 .unwrap();
468 assert_eq!(detect_query_type(&parsed), QueryType::Graph);
469 }
470
471 #[test]
472 fn test_detect_query_type_hybrid_vector_aggregation() {
473 let parsed = velesql::Parser::parse(
475 "SELECT category, COUNT(*) FROM docs WHERE similarity(embedding, $v) > 0.7 GROUP BY category",
476 )
477 .unwrap();
478 assert_eq!(detect_query_type(&parsed), QueryType::Aggregation);
479 }
480
481 #[test]
482 fn test_detect_query_type_ddl_create() {
483 let parsed =
484 velesql::Parser::parse("CREATE COLLECTION docs (dimension = 768, metric = 'cosine');")
485 .unwrap();
486 assert_eq!(detect_query_type(&parsed), QueryType::Ddl);
487 }
488
489 #[test]
490 fn test_detect_query_type_ddl_drop() {
491 let parsed = velesql::Parser::parse("DROP COLLECTION docs;").unwrap();
492 assert_eq!(detect_query_type(&parsed), QueryType::Ddl);
493 }
494
495 #[test]
496 fn test_detect_query_type_dml_insert_edge() {
497 let parsed = velesql::Parser::parse(
498 "INSERT EDGE INTO kg (source = 1, target = 2, label = 'KNOWS');",
499 )
500 .unwrap();
501 assert_eq!(detect_query_type(&parsed), QueryType::Dml);
502 }
503
504 #[test]
505 fn test_detect_query_type_dml_delete() {
506 let parsed = velesql::Parser::parse("DELETE FROM docs WHERE id = 1;").unwrap();
507 assert_eq!(detect_query_type(&parsed), QueryType::Dml);
508 }
509}