1pub mod explain;
4pub(crate) mod velesql_helpers;
5
6pub use explain::{__path_explain, explain};
7
8use axum::{extract::State, http::StatusCode, response::IntoResponse, Json};
9use std::sync::Arc;
10use velesdb_core::collection::search::query::projection;
11#[cfg(test)]
12use velesdb_core::velesql;
13use velesdb_core::velesql::{Query, SelectColumns};
14
15use crate::handlers::helpers::notify_query_timing;
16use crate::types::{
17 AggregationResponse, QueryRequest, QueryResponse, QueryResponseMeta, QueryType,
18 VELESQL_CONTRACT_VERSION,
19};
20use crate::AppState;
21
22use explain::condition_has_vector_search;
23use velesql_helpers::{parse_and_validate, velesql_collection_not_found, velesql_error};
24
25fn is_aggregation_query(select: &velesdb_core::velesql::SelectStatement) -> bool {
26 let has_aggs = match &select.columns {
27 SelectColumns::Aggregations(_) => true,
28 SelectColumns::Mixed { aggregations, .. } => !aggregations.is_empty(),
29 _ => false,
30 };
31 has_aggs || select.group_by.is_some()
32}
33
34fn aggregation_result_count(result: &serde_json::Value) -> usize {
35 match result {
36 serde_json::Value::Array(rows) => rows.len(),
37 serde_json::Value::Object(_) => 1,
38 _ => 0,
39 }
40}
41
42#[allow(deprecated)]
43fn execute_aggregation_query(
44 state: &Arc<AppState>,
45 collection_name: &str,
46 parsed: &Query,
47 params: &std::collections::HashMap<String, serde_json::Value>,
48 start: std::time::Instant,
49) -> axum::response::Response {
50 let collection = match state.db.get_collection(collection_name) {
51 Some(c) => c,
52 None => return velesql_collection_not_found(collection_name),
53 };
54
55 let result = match collection.execute_aggregate(parsed, params) {
56 Ok(r) => r,
57 Err(e) => {
58 return velesql_error(
59 StatusCode::UNPROCESSABLE_ENTITY,
60 "VELESQL_AGGREGATION_ERROR",
61 &e.to_string(),
62 "Verify GROUP BY/HAVING clauses and aggregate function arguments",
63 None,
64 )
65 }
66 };
67
68 let timing_ms = start.elapsed().as_secs_f64() * 1000.0;
69 notify_query_timing(state, collection_name, start);
70 let count = aggregation_result_count(&result);
71
72 Json(AggregationResponse {
73 result,
74 timing_ms,
75 meta: QueryResponseMeta {
76 velesql_contract_version: VELESQL_CONTRACT_VERSION.to_string(),
77 count,
78 },
79 })
80 .into_response()
81}
82
83#[utoipa::path(
88 post,
89 path = "/query",
90 tag = "query",
91 request_body = QueryRequest,
92 responses(
93 (status = 200, description = "Query results", body = QueryResponse),
94 (status = 400, description = "Query syntax error", body = crate::types::QueryErrorResponse),
95 (status = 422, description = "Query validation/execution error", body = crate::types::VelesqlErrorResponse),
96 (status = 404, description = "Collection not found", body = crate::types::VelesqlErrorResponse)
97 )
98)]
99#[allow(clippy::unused_async, deprecated)]
100pub async fn query(
101 State(state): State<Arc<AppState>>,
102 Json(req): Json<QueryRequest>,
103) -> impl IntoResponse {
104 let start = std::time::Instant::now();
105
106 let parsed = match parse_and_validate(&req.query) {
107 Ok(q) => q,
108 Err(resp) => return resp,
109 };
110
111 let collection_name = match resolve_collection_name(&parsed, &req) {
112 Ok(name) => name,
113 Err(resp) => return resp,
114 };
115
116 if is_aggregation_query(&parsed.select) {
118 return execute_aggregation_query(&state, &collection_name, &parsed, &req.params, start);
119 }
120
121 let results = match execute_standard_query(&state, &parsed, &collection_name, &req) {
122 Ok(r) => r,
123 Err(resp) => return resp,
124 };
125
126 build_query_response(
127 &state,
128 &collection_name,
129 start,
130 results,
131 &parsed.select.columns,
132 )
133}
134
135#[allow(clippy::result_large_err)]
137fn resolve_collection_name(
138 parsed: &Query,
139 req: &QueryRequest,
140) -> Result<String, axum::response::Response> {
141 if parsed.is_match_query() {
142 req.collection
143 .as_ref()
144 .filter(|name| !name.is_empty())
145 .cloned()
146 .ok_or_else(|| {
147 velesql_error(
148 StatusCode::UNPROCESSABLE_ENTITY,
149 "VELESQL_MISSING_COLLECTION",
150 "MATCH query via /query requires `collection` in request body",
151 "Add `collection` to the /query JSON body or use /collections/{name}/match",
152 Some(serde_json::json!({
153 "field": "collection",
154 "endpoint": "/query",
155 "query_type": "MATCH"
156 })),
157 )
158 })
159 } else {
160 Ok(parsed.select.from.clone())
161 }
162}
163
164#[allow(deprecated, clippy::result_large_err)]
166fn execute_standard_query(
167 state: &Arc<AppState>,
168 parsed: &Query,
169 collection_name: &str,
170 req: &QueryRequest,
171) -> Result<Vec<velesdb_core::SearchResult>, axum::response::Response> {
172 let execute_result = if parsed.is_match_query() {
173 match state.db.get_collection(collection_name) {
174 Some(c) => c.execute_query(parsed, &req.params),
175 None => Err(velesdb_core::Error::CollectionNotFound(
176 collection_name.to_string(),
177 )),
178 }
179 } else {
180 state.db.execute_query(parsed, &req.params)
181 };
182
183 execute_result.map_err(|e| match e {
184 velesdb_core::Error::CollectionNotFound(name) => velesql_collection_not_found(&name),
185 other => velesql_error(
186 StatusCode::UNPROCESSABLE_ENTITY,
187 "VELESQL_EXECUTION_ERROR",
188 &other.to_string(),
189 "Validate query semantics and parameter types against the target collection",
190 None,
191 ),
192 })
193}
194
195fn build_query_response(
197 state: &Arc<AppState>,
198 collection_name: &str,
199 start: std::time::Instant,
200 results: Vec<velesdb_core::SearchResult>,
201 select_columns: &SelectColumns,
202) -> axum::response::Response {
203 let timing_ms = start.elapsed().as_secs_f64() * 1000.0;
204 #[allow(clippy::cast_possible_truncation)]
205 let took_ms = timing_ms.round() as u64;
207 notify_query_timing(state, collection_name, start);
208 let projected = projection::project_results(&results, select_columns);
209 let rows_returned = projected.len();
210
211 Json(QueryResponse {
212 results: projected,
213 timing_ms,
214 took_ms,
215 rows_returned,
216 meta: QueryResponseMeta {
217 velesql_contract_version: VELESQL_CONTRACT_VERSION.to_string(),
218 count: rows_returned,
219 },
220 })
221 .into_response()
222}
223
224#[utoipa::path(
228 post,
229 path = "/aggregate",
230 tag = "query",
231 request_body = QueryRequest,
232 responses(
233 (status = 200, description = "Aggregation results", body = AggregationResponse),
234 (status = 400, description = "Query syntax error", body = crate::types::QueryErrorResponse),
235 (status = 422, description = "Aggregation validation/execution error", body = crate::types::VelesqlErrorResponse),
236 (status = 404, description = "Collection not found", body = crate::types::VelesqlErrorResponse)
237 )
238)]
239#[allow(clippy::unused_async)]
240pub async fn aggregate(
241 State(state): State<Arc<AppState>>,
242 Json(req): Json<QueryRequest>,
243) -> impl IntoResponse {
244 let start = std::time::Instant::now();
245
246 let parsed = match parse_and_validate(&req.query) {
247 Ok(q) => q,
248 Err(resp) => return resp,
249 };
250
251 if parsed.is_match_query() || !is_aggregation_query(&parsed.select) {
252 return velesql_error(
253 StatusCode::UNPROCESSABLE_ENTITY,
254 "VELESQL_AGGREGATION_ERROR",
255 "Only aggregation queries are accepted on /aggregate",
256 "Use /query for row/search/graph queries; use /aggregate for GROUP BY/aggregate workloads.",
257 Some(serde_json::json!({ "endpoint": "/aggregate" })),
258 );
259 }
260
261 let collection_name = resolve_aggregate_collection(&parsed, &req);
262 let collection_name = match collection_name {
263 Ok(name) => name,
264 Err(resp) => return resp,
265 };
266
267 execute_aggregation_query(&state, &collection_name, &parsed, &req.params, start)
268}
269
270#[allow(clippy::result_large_err)]
272fn resolve_aggregate_collection(
273 parsed: &Query,
274 req: &QueryRequest,
275) -> Result<String, axum::response::Response> {
276 if !parsed.select.from.is_empty() {
277 return Ok(parsed.select.from.clone());
278 }
279 req.collection
280 .as_ref()
281 .filter(|name| !name.is_empty())
282 .cloned()
283 .ok_or_else(|| {
284 velesql_error(
285 StatusCode::UNPROCESSABLE_ENTITY,
286 "VELESQL_MISSING_COLLECTION",
287 "Aggregation query requires a FROM collection or request-body `collection`",
288 "Add FROM <collection> to query or set `collection` in request JSON",
289 Some(serde_json::json!({
290 "field": "collection",
291 "endpoint": "/aggregate"
292 })),
293 )
294 })
295}
296
297#[allow(dead_code)] pub fn detect_query_type(query: &Query) -> QueryType {
306 if query.is_match_query() {
307 return QueryType::Graph;
308 }
309
310 if is_aggregation_query(&query.select) {
311 return QueryType::Aggregation;
312 }
313
314 let has_vector = query
315 .select
316 .where_clause
317 .as_ref()
318 .map(condition_has_vector_search)
319 .unwrap_or(false);
320
321 if has_vector {
322 return QueryType::Search;
323 }
324
325 QueryType::Rows
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331
332 #[test]
333 fn test_detect_query_type_search() {
334 let parsed = velesql::Parser::parse(
335 "SELECT * FROM docs WHERE similarity(embedding, $v) > 0.8 LIMIT 10",
336 )
337 .unwrap();
338 assert_eq!(detect_query_type(&parsed), QueryType::Search);
339 }
340
341 #[test]
342 fn test_detect_query_type_aggregation() {
343 let parsed =
344 velesql::Parser::parse("SELECT category, COUNT(*) FROM products GROUP BY category")
345 .unwrap();
346 assert_eq!(detect_query_type(&parsed), QueryType::Aggregation);
347 }
348
349 #[test]
350 fn test_detect_query_type_rows() {
351 let parsed =
352 velesql::Parser::parse("SELECT name, price FROM products WHERE price > 100").unwrap();
353 assert_eq!(detect_query_type(&parsed), QueryType::Rows);
354 }
355
356 #[test]
357 fn test_detect_query_type_graph() {
358 let parsed =
359 velesql::Parser::parse("MATCH (n:Person)-[:KNOWS]->(m) RETURN n.name, m.name LIMIT 10")
360 .unwrap();
361 assert_eq!(detect_query_type(&parsed), QueryType::Graph);
362 }
363
364 #[test]
365 fn test_detect_query_type_hybrid_vector_aggregation() {
366 let parsed = velesql::Parser::parse(
368 "SELECT category, COUNT(*) FROM docs WHERE similarity(embedding, $v) > 0.7 GROUP BY category",
369 )
370 .unwrap();
371 assert_eq!(detect_query_type(&parsed), QueryType::Aggregation);
372 }
373}