pub mod aggregation;
pub mod explain;
pub(crate) mod velesql_helpers;
pub use aggregation::__path_aggregate;
pub use aggregation::aggregate;
pub use explain::{__path_explain, explain};
use axum::{extract::State, http::StatusCode, response::IntoResponse, Json};
use std::sync::Arc;
use velesdb_core::collection::search::query::projection;
#[cfg(test)]
use velesdb_core::velesql;
use velesdb_core::velesql::{
DdlStatement, DmlStatement, IntrospectionStatement, Query, SelectColumns,
};
use crate::handlers::helpers::notify_query_timing;
use crate::types::{
QueryRequest, QueryResponse, QueryResponseMeta, QueryType, VELESQL_CONTRACT_VERSION,
};
use crate::AppState;
use aggregation::{execute_aggregation_query, is_aggregation_query};
use explain::condition_has_vector_search;
use velesql_helpers::{parse_and_validate, velesql_collection_not_found, velesql_error};
fn requires_mutation_dispatch(parsed: &Query) -> bool {
parsed.is_ddl_query()
|| parsed.is_introspection_query()
|| parsed.is_admin_query()
|| parsed.is_train()
|| is_ast_routed_dml(parsed)
}
fn is_ast_routed_dml(parsed: &Query) -> bool {
matches!(
parsed.dml,
Some(
DmlStatement::InsertEdge(_)
| DmlStatement::Delete(_)
| DmlStatement::DeleteEdge(_)
| DmlStatement::SelectEdges(_)
| DmlStatement::InsertNode(_)
)
)
}
#[utoipa::path(
post,
path = "/query",
tag = "query",
request_body = QueryRequest,
responses(
(status = 200, description = "Query results", body = QueryResponse),
(status = 400, description = "Query syntax error", body = crate::types::QueryErrorResponse),
(status = 422, description = "Query validation/execution error", body = crate::types::VelesqlErrorResponse),
(status = 404, description = "Collection not found", body = crate::types::VelesqlErrorResponse)
)
)]
#[allow(clippy::unused_async)]
pub async fn query(
State(state): State<Arc<AppState>>,
Json(req): Json<QueryRequest>,
) -> impl IntoResponse {
let start = std::time::Instant::now();
state.operational_metrics.inc_queries();
let parsed = match parse_and_validate(&req.query) {
Ok(q) => q,
Err(resp) => {
state.operational_metrics.inc_errors();
return resp;
}
};
if requires_mutation_dispatch(&parsed) {
return execute_mutation_query(&state, &parsed, &req.params, start);
}
let collection_name = match resolve_collection_name(&parsed, &req) {
Ok(name) => name,
Err(resp) => {
state.operational_metrics.inc_errors();
return resp;
}
};
if is_aggregation_query(&parsed.select) {
return execute_aggregation_query(&state, &collection_name, &parsed, &req.params, start);
}
let results = match execute_standard_query(&state, &parsed, &collection_name, &req) {
Ok(r) => r,
Err(resp) => {
state.operational_metrics.inc_errors();
return resp;
}
};
build_query_response(
&state,
&collection_name,
start,
results,
&parsed.select.columns,
)
}
fn extract_mutation_collection_name(parsed: &Query) -> String {
if let Some(name) = extract_ddl_collection(parsed) {
return name;
}
if let Some(name) = extract_dml_collection(parsed) {
return name;
}
if let Some(ref intro) = parsed.introspection {
return match intro {
IntrospectionStatement::DescribeCollection(d) => d.name.clone(),
IntrospectionStatement::ShowCollections | IntrospectionStatement::Explain(_) => {
"_system".to_string()
}
other => warn_unknown_velesql_variant("IntrospectionStatement", other),
};
}
if let Some(ref admin) = parsed.admin {
return match admin {
velesdb_core::velesql::AdminStatement::Flush(f) => f
.collection
.clone()
.unwrap_or_else(|| "_system".to_string()),
other => warn_unknown_velesql_variant("AdminStatement", other),
};
}
if let Some(ref train) = parsed.train {
return train.collection.clone();
}
"_system".to_string()
}
fn extract_ddl_collection(parsed: &Query) -> Option<String> {
parsed.ddl.as_ref().and_then(|ddl| match ddl {
DdlStatement::CreateCollection(s) => Some(s.name.clone()),
DdlStatement::DropCollection(s) => Some(s.name.clone()),
DdlStatement::CreateIndex(s) => Some(s.collection.clone()),
DdlStatement::DropIndex(s) => Some(s.collection.clone()),
DdlStatement::Analyze(s) => Some(s.collection.clone()),
DdlStatement::Truncate(s) => Some(s.collection.clone()),
DdlStatement::AlterCollection(s) => Some(s.collection.clone()),
other => {
warn_unknown_velesql_variant("DdlStatement", other);
None
}
})
}
fn extract_dml_collection(parsed: &Query) -> Option<String> {
parsed.dml.as_ref().and_then(|dml| match dml {
DmlStatement::Insert(s) | DmlStatement::Upsert(s) => Some(s.table.clone()),
DmlStatement::Update(s) => Some(s.table.clone()),
DmlStatement::InsertEdge(s) => Some(s.collection.clone()),
DmlStatement::Delete(s) => Some(s.table.clone()),
DmlStatement::DeleteEdge(s) => Some(s.collection.clone()),
DmlStatement::SelectEdges(s) => Some(s.collection.clone()),
DmlStatement::InsertNode(s) => Some(s.collection.clone()),
other => {
warn_unknown_velesql_variant("DmlStatement", other);
None
}
})
}
fn warn_unknown_velesql_variant<T: std::fmt::Debug>(kind: &'static str, variant: &T) -> String {
tracing::warn!(
target: "velesql.dispatch",
enum_kind = kind,
variant = ?variant,
"unknown VelesQL statement variant on telemetry extraction path; \
routing collection tag to _system — add the new variant to \
extract_mutation_collection_name in handlers/query/mod.rs"
);
"_system".to_string()
}
fn execute_mutation_query(
state: &Arc<AppState>,
parsed: &Query,
params: &std::collections::HashMap<String, serde_json::Value>,
start: std::time::Instant,
) -> axum::response::Response {
match state.db.execute_query(parsed, params) {
Ok(results) => {
let coll_name = extract_mutation_collection_name(parsed);
notify_query_timing(state, &coll_name, start);
let elapsed = start.elapsed();
let timing_ms = elapsed.as_secs_f64() * 1000.0;
#[allow(clippy::cast_possible_truncation)]
let took_ms = timing_ms.round() as u64;
state
.query_duration_histogram
.observe(elapsed.as_secs_f64());
let projected = projection::project_results(&results, &parsed.select.columns);
let rows_returned = projected.len();
Json(QueryResponse {
results: projected,
timing_ms,
took_ms,
rows_returned,
meta: QueryResponseMeta {
velesql_contract_version: VELESQL_CONTRACT_VERSION.to_string(),
count: rows_returned,
},
})
.into_response()
}
Err(e) => {
state.operational_metrics.inc_errors();
velesql_error(
StatusCode::UNPROCESSABLE_ENTITY,
"VELESQL_MUTATION_ERROR",
&e.to_string(),
"Check collection name, statement syntax, and target existence",
None,
)
}
}
}
#[allow(clippy::result_large_err)]
fn resolve_collection_name(
parsed: &Query,
req: &QueryRequest,
) -> Result<String, axum::response::Response> {
if parsed.is_match_query() {
req.collection
.as_ref()
.filter(|name| !name.is_empty())
.cloned()
.ok_or_else(|| {
velesql_error(
StatusCode::UNPROCESSABLE_ENTITY,
"VELESQL_MISSING_COLLECTION",
"MATCH query via /query requires `collection` in request body",
"Add `collection` to the /query JSON body or use /collections/{name}/match",
Some(serde_json::json!({
"field": "collection",
"endpoint": "/query",
"query_type": "MATCH"
})),
)
})
} else {
Ok(parsed.select.from.clone())
}
}
#[allow(clippy::result_large_err)]
fn execute_standard_query(
state: &Arc<AppState>,
parsed: &Query,
collection_name: &str,
req: &QueryRequest,
) -> Result<Vec<velesdb_core::SearchResult>, axum::response::Response> {
let execute_result = if parsed.is_match_query() {
if let Some(vc) = state.db.get_vector_collection(collection_name) {
vc.execute_query(parsed, &req.params)
} else if let Some(gc) = state.db.get_graph_collection(collection_name) {
gc.execute_query(parsed, &req.params)
} else if let Some(mc) = state.db.get_metadata_collection(collection_name) {
mc.execute_query(parsed, &req.params)
} else {
Err(velesdb_core::Error::CollectionNotFound(
collection_name.to_string(),
))
}
} else {
state.db.execute_query(parsed, &req.params)
};
execute_result.map_err(|e| match e {
velesdb_core::Error::CollectionNotFound(name) => velesql_collection_not_found(&name),
other => velesql_error(
StatusCode::UNPROCESSABLE_ENTITY,
"VELESQL_EXECUTION_ERROR",
&other.to_string(),
"Validate query semantics and parameter types against the target collection",
None,
),
})
}
fn build_query_response(
state: &Arc<AppState>,
collection_name: &str,
start: std::time::Instant,
results: Vec<velesdb_core::SearchResult>,
select_columns: &SelectColumns,
) -> axum::response::Response {
let elapsed = start.elapsed();
let timing_ms = elapsed.as_secs_f64() * 1000.0;
#[allow(clippy::cast_possible_truncation)]
let took_ms = timing_ms.round() as u64;
notify_query_timing(state, collection_name, start);
state
.query_duration_histogram
.observe(elapsed.as_secs_f64());
let projected = projection::project_results(&results, select_columns);
let rows_returned = projected.len();
Json(QueryResponse {
results: projected,
timing_ms,
took_ms,
rows_returned,
meta: QueryResponseMeta {
velesql_contract_version: VELESQL_CONTRACT_VERSION.to_string(),
count: rows_returned,
},
})
.into_response()
}
#[allow(dead_code)] pub fn detect_query_type(query: &Query) -> QueryType {
if query.is_ddl_query() {
return QueryType::Ddl;
}
if query.is_dml_query() {
return QueryType::Dml;
}
if query.is_match_query() {
return QueryType::Graph;
}
if is_aggregation_query(&query.select) {
return QueryType::Aggregation;
}
let has_vector = query
.select
.where_clause
.as_ref()
.map(condition_has_vector_search)
.unwrap_or(false);
if has_vector {
return QueryType::Search;
}
QueryType::Rows
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_detect_query_type_search() {
let parsed = velesql::Parser::parse(
"SELECT * FROM docs WHERE similarity(embedding, $v) > 0.8 LIMIT 10",
)
.unwrap();
assert_eq!(detect_query_type(&parsed), QueryType::Search);
}
#[test]
fn test_detect_query_type_aggregation() {
let parsed =
velesql::Parser::parse("SELECT category, COUNT(*) FROM products GROUP BY category")
.unwrap();
assert_eq!(detect_query_type(&parsed), QueryType::Aggregation);
}
#[test]
fn test_detect_query_type_rows() {
let parsed =
velesql::Parser::parse("SELECT name, price FROM products WHERE price > 100").unwrap();
assert_eq!(detect_query_type(&parsed), QueryType::Rows);
}
#[test]
fn test_detect_query_type_graph() {
let parsed =
velesql::Parser::parse("MATCH (n:Person)-[:KNOWS]->(m) RETURN n.name, m.name LIMIT 10")
.unwrap();
assert_eq!(detect_query_type(&parsed), QueryType::Graph);
}
#[test]
fn test_detect_query_type_hybrid_vector_aggregation() {
let parsed = velesql::Parser::parse(
"SELECT category, COUNT(*) FROM docs WHERE similarity(embedding, $v) > 0.7 GROUP BY category",
)
.unwrap();
assert_eq!(detect_query_type(&parsed), QueryType::Aggregation);
}
#[test]
fn test_detect_query_type_ddl_create() {
let parsed =
velesql::Parser::parse("CREATE COLLECTION docs (dimension = 768, metric = 'cosine');")
.unwrap();
assert_eq!(detect_query_type(&parsed), QueryType::Ddl);
}
#[test]
fn test_detect_query_type_ddl_drop() {
let parsed = velesql::Parser::parse("DROP COLLECTION docs;").unwrap();
assert_eq!(detect_query_type(&parsed), QueryType::Ddl);
}
#[test]
fn test_detect_query_type_dml_insert_edge() {
let parsed = velesql::Parser::parse(
"INSERT EDGE INTO kg (source = 1, target = 2, label = 'KNOWS');",
)
.unwrap();
assert_eq!(detect_query_type(&parsed), QueryType::Dml);
}
#[test]
fn test_detect_query_type_dml_delete() {
let parsed = velesql::Parser::parse("DELETE FROM docs WHERE id = 1;").unwrap();
assert_eq!(detect_query_type(&parsed), QueryType::Dml);
}
}