Skip to main content

velesdb_server/handlers/query/
mod.rs

1//! VelesQL query execution handlers.
2
3pub mod aggregation;
4pub mod explain;
5pub(crate) mod velesql_helpers;
6
7pub use aggregation::__path_aggregate;
8pub use aggregation::aggregate;
9pub use explain::{__path_explain, explain};
10
11use axum::{extract::State, http::StatusCode, response::IntoResponse, Json};
12use std::sync::Arc;
13use velesdb_core::collection::search::query::projection;
14#[cfg(test)]
15use velesdb_core::velesql;
16use velesdb_core::velesql::{
17    DdlStatement, DmlStatement, IntrospectionStatement, Query, SelectColumns,
18};
19
20use crate::handlers::helpers::notify_query_timing;
21use crate::types::{
22    QueryRequest, QueryResponse, QueryResponseMeta, QueryType, VELESQL_CONTRACT_VERSION,
23};
24use crate::AppState;
25
26use aggregation::{execute_aggregation_query, is_aggregation_query};
27use explain::condition_has_vector_search;
28use velesql_helpers::{parse_and_validate, velesql_collection_not_found, velesql_error};
29
30/// Returns `true` when the query should bypass collection resolution and go
31/// directly through `Database::execute_query` — DDL, introspection, admin,
32/// TRAIN, or graph/edge/delete DML that resolves its own collection from the AST.
33fn requires_mutation_dispatch(parsed: &Query) -> bool {
34    parsed.is_ddl_query()
35        || parsed.is_introspection_query()
36        || parsed.is_admin_query()
37        || parsed.is_train()
38        || is_ast_routed_dml(parsed)
39}
40
41/// Returns `true` for DML statements that resolve their collection name from
42/// the AST rather than from the request body's `FROM` clause:
43/// `INSERT EDGE`, `DELETE`, `DELETE EDGE`, `SELECT EDGES`, `INSERT NODE`.
44///
45/// `INSERT INTO`, `UPSERT`, and `UPDATE` return result rows and must flow
46/// through the standard query path (they use `stmt.table` which maps to
47/// the SELECT `FROM`).
48fn is_ast_routed_dml(parsed: &Query) -> bool {
49    matches!(
50        parsed.dml,
51        Some(
52            DmlStatement::InsertEdge(_)
53                | DmlStatement::Delete(_)
54                | DmlStatement::DeleteEdge(_)
55                | DmlStatement::SelectEdges(_)
56                | DmlStatement::InsertNode(_)
57        )
58    )
59}
60
61/// Execute a VelesQL query.
62///
63/// BUG-1 FIX: Automatically detects aggregation queries (GROUP BY, COUNT, SUM, etc.)
64/// and routes them to execute_aggregate for proper handling.
65///
66/// DDL statements (CREATE/DROP COLLECTION) are intercepted before collection
67/// resolution and dispatched directly through `Database::execute_query`.
68#[utoipa::path(
69    post,
70    path = "/query",
71    tag = "query",
72    request_body = QueryRequest,
73    responses(
74        (status = 200, description = "Query results", body = QueryResponse),
75        (status = 400, description = "Query syntax error", body = crate::types::QueryErrorResponse),
76        (status = 422, description = "Query validation/execution error", body = crate::types::VelesqlErrorResponse),
77        (status = 404, description = "Collection not found", body = crate::types::VelesqlErrorResponse)
78    )
79)]
80#[allow(clippy::unused_async)]
81pub async fn query(
82    State(state): State<Arc<AppState>>,
83    Json(req): Json<QueryRequest>,
84) -> impl IntoResponse {
85    let start = std::time::Instant::now();
86    state.operational_metrics.inc_queries();
87
88    let parsed = match parse_and_validate(&req.query) {
89        Ok(q) => q,
90        Err(resp) => {
91            state.operational_metrics.inc_errors();
92            return resp;
93        }
94    };
95
96    // DDL/Introspection/Admin/graph-mutation bypass: these extract collection from
97    // the SQL AST, not from the request body.  INSERT INTO, UPSERT, and UPDATE flow
98    // through the standard path because they return meaningful result rows.
99    if requires_mutation_dispatch(&parsed) {
100        return execute_mutation_query(&state, &parsed, &req.params, start);
101    }
102
103    let collection_name = match resolve_collection_name(&parsed, &req) {
104        Ok(name) => name,
105        Err(resp) => {
106            state.operational_metrics.inc_errors();
107            return resp;
108        }
109    };
110
111    // BUG-1 FIX: Detect aggregation queries and route to execute_aggregate
112    if is_aggregation_query(&parsed.select) {
113        return execute_aggregation_query(&state, &collection_name, &parsed, &req.params, start);
114    }
115
116    let results = match execute_standard_query(&state, &parsed, &collection_name, &req) {
117        Ok(r) => r,
118        Err(resp) => {
119            state.operational_metrics.inc_errors();
120            return resp;
121        }
122    };
123
124    build_query_response(
125        &state,
126        &collection_name,
127        start,
128        results,
129        &parsed.select.columns,
130    )
131}
132
133/// Extract the collection name targeted by a mutation/DDL/introspection query.
134///
135/// Returns `"_system"` for global operations that do not target a specific
136/// collection (SHOW COLLECTIONS, EXPLAIN, FLUSH without collection). This
137/// function is used exclusively for telemetry tagging — it is NOT on the
138/// HTTP routing path. The actual query execution goes through
139/// [`Database::execute_query`] which enforces its own dispatch rules.
140///
141/// # Forward compatibility
142///
143/// The four VelesQL statement enums (`IntrospectionStatement`,
144/// `AdminStatement`, `DdlStatement`, `DmlStatement`) are annotated
145/// `#[non_exhaustive]` so a wildcard `_` arm is mandatory here. To prevent
146/// silent telemetry degradation when a new core variant is added without
147/// an accompanying handler update, every wildcard arm now logs a
148/// `tracing::warn!` with a stable target so it is visible in CI logs and
149/// production observability.
150fn extract_mutation_collection_name(parsed: &Query) -> String {
151    if let Some(name) = extract_ddl_collection(parsed) {
152        return name;
153    }
154    if let Some(name) = extract_dml_collection(parsed) {
155        return name;
156    }
157    if let Some(ref intro) = parsed.introspection {
158        return match intro {
159            IntrospectionStatement::DescribeCollection(d) => d.name.clone(),
160            IntrospectionStatement::ShowCollections | IntrospectionStatement::Explain(_) => {
161                "_system".to_string()
162            }
163            other => warn_unknown_velesql_variant("IntrospectionStatement", other),
164        };
165    }
166    if let Some(ref admin) = parsed.admin {
167        return match admin {
168            velesdb_core::velesql::AdminStatement::Flush(f) => f
169                .collection
170                .clone()
171                .unwrap_or_else(|| "_system".to_string()),
172            other => warn_unknown_velesql_variant("AdminStatement", other),
173        };
174    }
175    if let Some(ref train) = parsed.train {
176        return train.collection.clone();
177    }
178    "_system".to_string()
179}
180
181/// Extract collection name from a DDL statement.
182fn extract_ddl_collection(parsed: &Query) -> Option<String> {
183    parsed.ddl.as_ref().and_then(|ddl| match ddl {
184        DdlStatement::CreateCollection(s) => Some(s.name.clone()),
185        DdlStatement::DropCollection(s) => Some(s.name.clone()),
186        DdlStatement::CreateIndex(s) => Some(s.collection.clone()),
187        DdlStatement::DropIndex(s) => Some(s.collection.clone()),
188        DdlStatement::Analyze(s) => Some(s.collection.clone()),
189        DdlStatement::Truncate(s) => Some(s.collection.clone()),
190        DdlStatement::AlterCollection(s) => Some(s.collection.clone()),
191        other => {
192            warn_unknown_velesql_variant("DdlStatement", other);
193            None
194        }
195    })
196}
197
198/// Extract collection name from a DML statement.
199fn extract_dml_collection(parsed: &Query) -> Option<String> {
200    parsed.dml.as_ref().and_then(|dml| match dml {
201        DmlStatement::Insert(s) | DmlStatement::Upsert(s) => Some(s.table.clone()),
202        DmlStatement::Update(s) => Some(s.table.clone()),
203        DmlStatement::InsertEdge(s) => Some(s.collection.clone()),
204        DmlStatement::Delete(s) => Some(s.table.clone()),
205        DmlStatement::DeleteEdge(s) => Some(s.collection.clone()),
206        DmlStatement::SelectEdges(s) => Some(s.collection.clone()),
207        DmlStatement::InsertNode(s) => Some(s.collection.clone()),
208        other => {
209            warn_unknown_velesql_variant("DmlStatement", other);
210            None
211        }
212    })
213}
214
215/// Logs a structured warning when an unknown VelesQL statement variant is
216/// encountered on the telemetry extraction path and returns the
217/// `"_system"` sentinel string. The structured `target` `velesql.dispatch`
218/// is stable so CI log aggregators and production observability can alert
219/// when a new core variant slips past the handler mapper without an
220/// accompanying update.
221///
222/// Note: this fallback only affects the collection-name tag attached to
223/// request telemetry. The HTTP response is still produced by the
224/// downstream `Database::execute_query` call which rejects truly
225/// unsupported statements with a proper error.
226fn warn_unknown_velesql_variant<T: std::fmt::Debug>(kind: &'static str, variant: &T) -> String {
227    tracing::warn!(
228        target: "velesql.dispatch",
229        enum_kind = kind,
230        variant = ?variant,
231        "unknown VelesQL statement variant on telemetry extraction path; \
232         routing collection tag to _system — add the new variant to \
233         extract_mutation_collection_name in handlers/query/mod.rs"
234    );
235    "_system".to_string()
236}
237
238/// Execute a DDL, graph/delete DML, introspection, admin, or TRAIN query.
239///
240/// DDL (CREATE/DROP/ALTER/ANALYZE/TRUNCATE), graph/delete DML mutations
241/// (INSERT EDGE, DELETE, DELETE EDGE, SELECT EDGES, INSERT NODE),
242/// introspection (SHOW/DESCRIBE/EXPLAIN), admin (FLUSH), and TRAIN
243/// statements extract collection names from the SQL AST — no FROM clause
244/// needed.
245///
246/// Results from `Database::execute_query` are propagated into the response
247/// so that introspection (SHOW, DESCRIBE, EXPLAIN), ANALYZE, and SELECT
248/// EDGES return their data to the caller.
249fn execute_mutation_query(
250    state: &Arc<AppState>,
251    parsed: &Query,
252    params: &std::collections::HashMap<String, serde_json::Value>,
253    start: std::time::Instant,
254) -> axum::response::Response {
255    match state.db.execute_query(parsed, params) {
256        Ok(results) => {
257            let coll_name = extract_mutation_collection_name(parsed);
258            notify_query_timing(state, &coll_name, start);
259            let elapsed = start.elapsed();
260            let timing_ms = elapsed.as_secs_f64() * 1000.0;
261            #[allow(clippy::cast_possible_truncation)]
262            // Reason: timing_ms is always < u64::MAX (query durations < 585 millennia)
263            let took_ms = timing_ms.round() as u64;
264            state
265                .query_duration_histogram
266                .observe(elapsed.as_secs_f64());
267            let projected = projection::project_results(&results, &parsed.select.columns);
268            let rows_returned = projected.len();
269            Json(QueryResponse {
270                results: projected,
271                timing_ms,
272                took_ms,
273                rows_returned,
274                meta: QueryResponseMeta {
275                    velesql_contract_version: VELESQL_CONTRACT_VERSION.to_string(),
276                    count: rows_returned,
277                },
278            })
279            .into_response()
280        }
281        Err(e) => {
282            state.operational_metrics.inc_errors();
283            velesql_error(
284                StatusCode::UNPROCESSABLE_ENTITY,
285                "VELESQL_MUTATION_ERROR",
286                &e.to_string(),
287                "Check collection name, statement syntax, and target existence",
288                None,
289            )
290        }
291    }
292}
293
294/// Determine the target collection from the parsed query and request body.
295#[allow(clippy::result_large_err)]
296fn resolve_collection_name(
297    parsed: &Query,
298    req: &QueryRequest,
299) -> Result<String, axum::response::Response> {
300    if parsed.is_match_query() {
301        req.collection
302            .as_ref()
303            .filter(|name| !name.is_empty())
304            .cloned()
305            .ok_or_else(|| {
306                velesql_error(
307                    StatusCode::UNPROCESSABLE_ENTITY,
308                    "VELESQL_MISSING_COLLECTION",
309                    "MATCH query via /query requires `collection` in request body",
310                    "Add `collection` to the /query JSON body or use /collections/{name}/match",
311                    Some(serde_json::json!({
312                        "field": "collection",
313                        "endpoint": "/query",
314                        "query_type": "MATCH"
315                    })),
316                )
317            })
318    } else {
319        Ok(parsed.select.from.clone())
320    }
321}
322
323/// Execute a standard (non-aggregation) query, dispatching MATCH vs SELECT.
324#[allow(clippy::result_large_err)]
325fn execute_standard_query(
326    state: &Arc<AppState>,
327    parsed: &Query,
328    collection_name: &str,
329    req: &QueryRequest,
330) -> Result<Vec<velesdb_core::SearchResult>, axum::response::Response> {
331    let execute_result = if parsed.is_match_query() {
332        // MATCH queries need a collection instance for execute_query.
333        // Route through typed registries: vector → graph → metadata.
334        if let Some(vc) = state.db.get_vector_collection(collection_name) {
335            vc.execute_query(parsed, &req.params)
336        } else if let Some(gc) = state.db.get_graph_collection(collection_name) {
337            gc.execute_query(parsed, &req.params)
338        } else if let Some(mc) = state.db.get_metadata_collection(collection_name) {
339            mc.execute_query(parsed, &req.params)
340        } else {
341            Err(velesdb_core::Error::CollectionNotFound(
342                collection_name.to_string(),
343            ))
344        }
345    } else {
346        state.db.execute_query(parsed, &req.params)
347    };
348
349    execute_result.map_err(|e| match e {
350        velesdb_core::Error::CollectionNotFound(name) => velesql_collection_not_found(&name),
351        other => velesql_error(
352            StatusCode::UNPROCESSABLE_ENTITY,
353            "VELESQL_EXECUTION_ERROR",
354            &other.to_string(),
355            "Validate query semantics and parameter types against the target collection",
356            None,
357        ),
358    })
359}
360
361/// Build the final query response with timing metrics and SQL projection.
362fn build_query_response(
363    state: &Arc<AppState>,
364    collection_name: &str,
365    start: std::time::Instant,
366    results: Vec<velesdb_core::SearchResult>,
367    select_columns: &SelectColumns,
368) -> axum::response::Response {
369    let elapsed = start.elapsed();
370    let timing_ms = elapsed.as_secs_f64() * 1000.0;
371    #[allow(clippy::cast_possible_truncation)]
372    // Reason: timing_ms is always < u64::MAX (query durations < 585 millennia)
373    let took_ms = timing_ms.round() as u64;
374    notify_query_timing(state, collection_name, start);
375    state
376        .query_duration_histogram
377        .observe(elapsed.as_secs_f64());
378    let projected = projection::project_results(&results, select_columns);
379    let rows_returned = projected.len();
380
381    Json(QueryResponse {
382        results: projected,
383        timing_ms,
384        took_ms,
385        rows_returned,
386        meta: QueryResponseMeta {
387            velesql_contract_version: VELESQL_CONTRACT_VERSION.to_string(),
388            count: rows_returned,
389        },
390    })
391    .into_response()
392}
393
394/// Detect query type from parsed AST (EPIC-052 US-006).
395///
396/// Priority order:
397/// 1. DDL (CREATE/DROP COLLECTION) -> Ddl
398/// 2. DML (INSERT/UPDATE/DELETE) -> Dml
399/// 3. MATCH clause -> Graph
400/// 4. GROUP BY or aggregates -> Aggregation
401/// 5. Vector search -> Search
402/// 6. Default -> Rows
403#[allow(dead_code)] // Used in tests, will be used in unified handler
404pub fn detect_query_type(query: &Query) -> QueryType {
405    if query.is_ddl_query() {
406        return QueryType::Ddl;
407    }
408
409    if query.is_dml_query() {
410        return QueryType::Dml;
411    }
412
413    if query.is_match_query() {
414        return QueryType::Graph;
415    }
416
417    if is_aggregation_query(&query.select) {
418        return QueryType::Aggregation;
419    }
420
421    let has_vector = query
422        .select
423        .where_clause
424        .as_ref()
425        .map(condition_has_vector_search)
426        .unwrap_or(false);
427
428    if has_vector {
429        return QueryType::Search;
430    }
431
432    QueryType::Rows
433}
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438
439    #[test]
440    fn test_detect_query_type_search() {
441        let parsed = velesql::Parser::parse(
442            "SELECT * FROM docs WHERE similarity(embedding, $v) > 0.8 LIMIT 10",
443        )
444        .unwrap();
445        assert_eq!(detect_query_type(&parsed), QueryType::Search);
446    }
447
448    #[test]
449    fn test_detect_query_type_aggregation() {
450        let parsed =
451            velesql::Parser::parse("SELECT category, COUNT(*) FROM products GROUP BY category")
452                .unwrap();
453        assert_eq!(detect_query_type(&parsed), QueryType::Aggregation);
454    }
455
456    #[test]
457    fn test_detect_query_type_rows() {
458        let parsed =
459            velesql::Parser::parse("SELECT name, price FROM products WHERE price > 100").unwrap();
460        assert_eq!(detect_query_type(&parsed), QueryType::Rows);
461    }
462
463    #[test]
464    fn test_detect_query_type_graph() {
465        let parsed =
466            velesql::Parser::parse("MATCH (n:Person)-[:KNOWS]->(m) RETURN n.name, m.name LIMIT 10")
467                .unwrap();
468        assert_eq!(detect_query_type(&parsed), QueryType::Graph);
469    }
470
471    #[test]
472    fn test_detect_query_type_hybrid_vector_aggregation() {
473        // When both vector search and aggregation, aggregation takes priority
474        let parsed = velesql::Parser::parse(
475            "SELECT category, COUNT(*) FROM docs WHERE similarity(embedding, $v) > 0.7 GROUP BY category",
476        )
477        .unwrap();
478        assert_eq!(detect_query_type(&parsed), QueryType::Aggregation);
479    }
480
481    #[test]
482    fn test_detect_query_type_ddl_create() {
483        let parsed =
484            velesql::Parser::parse("CREATE COLLECTION docs (dimension = 768, metric = 'cosine');")
485                .unwrap();
486        assert_eq!(detect_query_type(&parsed), QueryType::Ddl);
487    }
488
489    #[test]
490    fn test_detect_query_type_ddl_drop() {
491        let parsed = velesql::Parser::parse("DROP COLLECTION docs;").unwrap();
492        assert_eq!(detect_query_type(&parsed), QueryType::Ddl);
493    }
494
495    #[test]
496    fn test_detect_query_type_dml_insert_edge() {
497        let parsed = velesql::Parser::parse(
498            "INSERT EDGE INTO kg (source = 1, target = 2, label = 'KNOWS');",
499        )
500        .unwrap();
501        assert_eq!(detect_query_type(&parsed), QueryType::Dml);
502    }
503
504    #[test]
505    fn test_detect_query_type_dml_delete() {
506        let parsed = velesql::Parser::parse("DELETE FROM docs WHERE id = 1;").unwrap();
507        assert_eq!(detect_query_type(&parsed), QueryType::Dml);
508    }
509}