Skip to main content

velesdb_server/handlers/query/
mod.rs

1//! VelesQL query execution handlers.
2
3pub mod explain;
4pub(crate) mod velesql_helpers;
5
6pub use explain::{__path_explain, explain};
7
8use axum::{extract::State, http::StatusCode, response::IntoResponse, Json};
9use std::sync::Arc;
10use velesdb_core::collection::search::query::projection;
11#[cfg(test)]
12use velesdb_core::velesql;
13use velesdb_core::velesql::{Query, SelectColumns};
14
15use crate::handlers::helpers::notify_query_timing;
16use crate::types::{
17    AggregationResponse, QueryRequest, QueryResponse, QueryResponseMeta, QueryType,
18    VELESQL_CONTRACT_VERSION,
19};
20use crate::AppState;
21
22use explain::condition_has_vector_search;
23use velesql_helpers::{parse_and_validate, velesql_collection_not_found, velesql_error};
24
25fn is_aggregation_query(select: &velesdb_core::velesql::SelectStatement) -> bool {
26    let has_aggs = match &select.columns {
27        SelectColumns::Aggregations(_) => true,
28        SelectColumns::Mixed { aggregations, .. } => !aggregations.is_empty(),
29        _ => false,
30    };
31    has_aggs || select.group_by.is_some()
32}
33
34fn aggregation_result_count(result: &serde_json::Value) -> usize {
35    match result {
36        serde_json::Value::Array(rows) => rows.len(),
37        serde_json::Value::Object(_) => 1,
38        _ => 0,
39    }
40}
41
42#[allow(deprecated)]
43fn execute_aggregation_query(
44    state: &Arc<AppState>,
45    collection_name: &str,
46    parsed: &Query,
47    params: &std::collections::HashMap<String, serde_json::Value>,
48    start: std::time::Instant,
49) -> axum::response::Response {
50    let collection = match state.db.get_collection(collection_name) {
51        Some(c) => c,
52        None => return velesql_collection_not_found(collection_name),
53    };
54
55    let result = match collection.execute_aggregate(parsed, params) {
56        Ok(r) => r,
57        Err(e) => {
58            return velesql_error(
59                StatusCode::UNPROCESSABLE_ENTITY,
60                "VELESQL_AGGREGATION_ERROR",
61                &e.to_string(),
62                "Verify GROUP BY/HAVING clauses and aggregate function arguments",
63                None,
64            )
65        }
66    };
67
68    let timing_ms = start.elapsed().as_secs_f64() * 1000.0;
69    notify_query_timing(state, collection_name, start);
70    let count = aggregation_result_count(&result);
71
72    Json(AggregationResponse {
73        result,
74        timing_ms,
75        meta: QueryResponseMeta {
76            velesql_contract_version: VELESQL_CONTRACT_VERSION.to_string(),
77            count,
78        },
79    })
80    .into_response()
81}
82
83/// Execute a VelesQL query.
84///
85/// BUG-1 FIX: Automatically detects aggregation queries (GROUP BY, COUNT, SUM, etc.)
86/// and routes them to execute_aggregate for proper handling.
87#[utoipa::path(
88    post,
89    path = "/query",
90    tag = "query",
91    request_body = QueryRequest,
92    responses(
93        (status = 200, description = "Query results", body = QueryResponse),
94        (status = 400, description = "Query syntax error", body = crate::types::QueryErrorResponse),
95        (status = 422, description = "Query validation/execution error", body = crate::types::VelesqlErrorResponse),
96        (status = 404, description = "Collection not found", body = crate::types::VelesqlErrorResponse)
97    )
98)]
99#[allow(clippy::unused_async, deprecated)]
100pub async fn query(
101    State(state): State<Arc<AppState>>,
102    Json(req): Json<QueryRequest>,
103) -> impl IntoResponse {
104    let start = std::time::Instant::now();
105
106    let parsed = match parse_and_validate(&req.query) {
107        Ok(q) => q,
108        Err(resp) => return resp,
109    };
110
111    let collection_name = match resolve_collection_name(&parsed, &req) {
112        Ok(name) => name,
113        Err(resp) => return resp,
114    };
115
116    // BUG-1 FIX: Detect aggregation queries and route to execute_aggregate
117    if is_aggregation_query(&parsed.select) {
118        return execute_aggregation_query(&state, &collection_name, &parsed, &req.params, start);
119    }
120
121    let results = match execute_standard_query(&state, &parsed, &collection_name, &req) {
122        Ok(r) => r,
123        Err(resp) => return resp,
124    };
125
126    build_query_response(
127        &state,
128        &collection_name,
129        start,
130        results,
131        &parsed.select.columns,
132    )
133}
134
135/// Determine the target collection from the parsed query and request body.
136#[allow(clippy::result_large_err)]
137fn resolve_collection_name(
138    parsed: &Query,
139    req: &QueryRequest,
140) -> Result<String, axum::response::Response> {
141    if parsed.is_match_query() {
142        req.collection
143            .as_ref()
144            .filter(|name| !name.is_empty())
145            .cloned()
146            .ok_or_else(|| {
147                velesql_error(
148                    StatusCode::UNPROCESSABLE_ENTITY,
149                    "VELESQL_MISSING_COLLECTION",
150                    "MATCH query via /query requires `collection` in request body",
151                    "Add `collection` to the /query JSON body or use /collections/{name}/match",
152                    Some(serde_json::json!({
153                        "field": "collection",
154                        "endpoint": "/query",
155                        "query_type": "MATCH"
156                    })),
157                )
158            })
159    } else {
160        Ok(parsed.select.from.clone())
161    }
162}
163
164/// Execute a standard (non-aggregation) query, dispatching MATCH vs SELECT.
165#[allow(deprecated, clippy::result_large_err)]
166fn execute_standard_query(
167    state: &Arc<AppState>,
168    parsed: &Query,
169    collection_name: &str,
170    req: &QueryRequest,
171) -> Result<Vec<velesdb_core::SearchResult>, axum::response::Response> {
172    let execute_result = if parsed.is_match_query() {
173        match state.db.get_collection(collection_name) {
174            Some(c) => c.execute_query(parsed, &req.params),
175            None => Err(velesdb_core::Error::CollectionNotFound(
176                collection_name.to_string(),
177            )),
178        }
179    } else {
180        state.db.execute_query(parsed, &req.params)
181    };
182
183    execute_result.map_err(|e| match e {
184        velesdb_core::Error::CollectionNotFound(name) => velesql_collection_not_found(&name),
185        other => velesql_error(
186            StatusCode::UNPROCESSABLE_ENTITY,
187            "VELESQL_EXECUTION_ERROR",
188            &other.to_string(),
189            "Validate query semantics and parameter types against the target collection",
190            None,
191        ),
192    })
193}
194
195/// Build the final query response with timing metrics and SQL projection.
196fn build_query_response(
197    state: &Arc<AppState>,
198    collection_name: &str,
199    start: std::time::Instant,
200    results: Vec<velesdb_core::SearchResult>,
201    select_columns: &SelectColumns,
202) -> axum::response::Response {
203    let timing_ms = start.elapsed().as_secs_f64() * 1000.0;
204    #[allow(clippy::cast_possible_truncation)]
205    // Reason: timing_ms is always < u64::MAX (query durations < 585 millennia)
206    let took_ms = timing_ms.round() as u64;
207    notify_query_timing(state, collection_name, start);
208    let projected = projection::project_results(&results, select_columns);
209    let rows_returned = projected.len();
210
211    Json(QueryResponse {
212        results: projected,
213        timing_ms,
214        took_ms,
215        rows_returned,
216        meta: QueryResponseMeta {
217            velesql_contract_version: VELESQL_CONTRACT_VERSION.to_string(),
218            count: rows_returned,
219        },
220    })
221    .into_response()
222}
223
224/// Execute an aggregation-only VelesQL query.
225///
226/// This endpoint is explicit and stable for GROUP BY / HAVING / aggregate workloads.
227#[utoipa::path(
228    post,
229    path = "/aggregate",
230    tag = "query",
231    request_body = QueryRequest,
232    responses(
233        (status = 200, description = "Aggregation results", body = AggregationResponse),
234        (status = 400, description = "Query syntax error", body = crate::types::QueryErrorResponse),
235        (status = 422, description = "Aggregation validation/execution error", body = crate::types::VelesqlErrorResponse),
236        (status = 404, description = "Collection not found", body = crate::types::VelesqlErrorResponse)
237    )
238)]
239#[allow(clippy::unused_async)]
240pub async fn aggregate(
241    State(state): State<Arc<AppState>>,
242    Json(req): Json<QueryRequest>,
243) -> impl IntoResponse {
244    let start = std::time::Instant::now();
245
246    let parsed = match parse_and_validate(&req.query) {
247        Ok(q) => q,
248        Err(resp) => return resp,
249    };
250
251    if parsed.is_match_query() || !is_aggregation_query(&parsed.select) {
252        return velesql_error(
253            StatusCode::UNPROCESSABLE_ENTITY,
254            "VELESQL_AGGREGATION_ERROR",
255            "Only aggregation queries are accepted on /aggregate",
256            "Use /query for row/search/graph queries; use /aggregate for GROUP BY/aggregate workloads.",
257            Some(serde_json::json!({ "endpoint": "/aggregate" })),
258        );
259    }
260
261    let collection_name = resolve_aggregate_collection(&parsed, &req);
262    let collection_name = match collection_name {
263        Ok(name) => name,
264        Err(resp) => return resp,
265    };
266
267    execute_aggregation_query(&state, &collection_name, &parsed, &req.params, start)
268}
269
270/// Resolve the collection name for an aggregation query.
271#[allow(clippy::result_large_err)]
272fn resolve_aggregate_collection(
273    parsed: &Query,
274    req: &QueryRequest,
275) -> Result<String, axum::response::Response> {
276    if !parsed.select.from.is_empty() {
277        return Ok(parsed.select.from.clone());
278    }
279    req.collection
280        .as_ref()
281        .filter(|name| !name.is_empty())
282        .cloned()
283        .ok_or_else(|| {
284            velesql_error(
285                StatusCode::UNPROCESSABLE_ENTITY,
286                "VELESQL_MISSING_COLLECTION",
287                "Aggregation query requires a FROM collection or request-body `collection`",
288                "Add FROM <collection> to query or set `collection` in request JSON",
289                Some(serde_json::json!({
290                    "field": "collection",
291                    "endpoint": "/aggregate"
292                })),
293            )
294        })
295}
296
297/// Detect query type from parsed AST (EPIC-052 US-006).
298///
299/// Priority order:
300/// 1. MATCH clause -> Graph
301/// 2. GROUP BY or aggregates -> Aggregation
302/// 3. Vector search -> Search
303/// 4. Default -> Rows
304#[allow(dead_code)] // Used in tests, will be used in unified handler
305pub fn detect_query_type(query: &Query) -> QueryType {
306    if query.is_match_query() {
307        return QueryType::Graph;
308    }
309
310    if is_aggregation_query(&query.select) {
311        return QueryType::Aggregation;
312    }
313
314    let has_vector = query
315        .select
316        .where_clause
317        .as_ref()
318        .map(condition_has_vector_search)
319        .unwrap_or(false);
320
321    if has_vector {
322        return QueryType::Search;
323    }
324
325    QueryType::Rows
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331
332    #[test]
333    fn test_detect_query_type_search() {
334        let parsed = velesql::Parser::parse(
335            "SELECT * FROM docs WHERE similarity(embedding, $v) > 0.8 LIMIT 10",
336        )
337        .unwrap();
338        assert_eq!(detect_query_type(&parsed), QueryType::Search);
339    }
340
341    #[test]
342    fn test_detect_query_type_aggregation() {
343        let parsed =
344            velesql::Parser::parse("SELECT category, COUNT(*) FROM products GROUP BY category")
345                .unwrap();
346        assert_eq!(detect_query_type(&parsed), QueryType::Aggregation);
347    }
348
349    #[test]
350    fn test_detect_query_type_rows() {
351        let parsed =
352            velesql::Parser::parse("SELECT name, price FROM products WHERE price > 100").unwrap();
353        assert_eq!(detect_query_type(&parsed), QueryType::Rows);
354    }
355
356    #[test]
357    fn test_detect_query_type_graph() {
358        let parsed =
359            velesql::Parser::parse("MATCH (n:Person)-[:KNOWS]->(m) RETURN n.name, m.name LIMIT 10")
360                .unwrap();
361        assert_eq!(detect_query_type(&parsed), QueryType::Graph);
362    }
363
364    #[test]
365    fn test_detect_query_type_hybrid_vector_aggregation() {
366        // When both vector search and aggregation, aggregation takes priority
367        let parsed = velesql::Parser::parse(
368            "SELECT category, COUNT(*) FROM docs WHERE similarity(embedding, $v) > 0.7 GROUP BY category",
369        )
370        .unwrap();
371        assert_eq!(detect_query_type(&parsed), QueryType::Aggregation);
372    }
373}