Skip to main content

velesdb_server/handlers/query/
mod.rs

1//! VelesQL query execution handlers.
2
3pub mod aggregation;
4pub mod explain;
5pub(crate) mod velesql_helpers;
6
7pub use aggregation::__path_aggregate;
8pub use aggregation::aggregate;
9pub use explain::{__path_explain, explain};
10
11use axum::{extract::State, http::StatusCode, response::IntoResponse, Json};
12use std::sync::Arc;
13use velesdb_core::collection::search::query::projection;
14#[cfg(test)]
15use velesdb_core::velesql;
16use velesdb_core::velesql::{
17    DdlStatement, DmlStatement, IntrospectionStatement, Query, SelectColumns,
18};
19
20use crate::handlers::helpers::notify_query_timing;
21use crate::types::{
22    QueryRequest, QueryResponse, QueryResponseMeta, QueryType, VELESQL_CONTRACT_VERSION,
23};
24use crate::AppState;
25
26use aggregation::{execute_aggregation_query, is_aggregation_query};
27use explain::condition_has_vector_search;
28use velesql_helpers::{parse_and_validate, velesql_collection_not_found, velesql_error};
29
30/// Returns `true` when the query should bypass collection resolution and go
31/// directly through `Database::execute_query` — DDL, introspection, admin,
32/// TRAIN, or graph/edge/delete DML that resolves its own collection from the AST.
33fn requires_mutation_dispatch(parsed: &Query) -> bool {
34    parsed.is_ddl_query()
35        || parsed.is_introspection_query()
36        || parsed.is_admin_query()
37        || parsed.is_train()
38        || is_ast_routed_dml(parsed)
39}
40
41/// Returns `true` for DML statements that resolve their collection name from
42/// the AST rather than from the request body's `FROM` clause:
43/// `INSERT EDGE`, `DELETE`, `DELETE EDGE`, `SELECT EDGES`, `INSERT NODE`.
44///
45/// `INSERT INTO`, `UPSERT`, and `UPDATE` return result rows and must flow
46/// through the standard query path (they use `stmt.table` which maps to
47/// the SELECT `FROM`).
48fn is_ast_routed_dml(parsed: &Query) -> bool {
49    matches!(
50        parsed.dml,
51        Some(
52            DmlStatement::InsertEdge(_)
53                | DmlStatement::Delete(_)
54                | DmlStatement::DeleteEdge(_)
55                | DmlStatement::SelectEdges(_)
56                | DmlStatement::InsertNode(_)
57        )
58    )
59}
60
61/// Execute a VelesQL query.
62///
63/// BUG-1 FIX: Automatically detects aggregation queries (GROUP BY, COUNT, SUM, etc.)
64/// and routes them to execute_aggregate for proper handling.
65///
66/// DDL statements (CREATE/DROP COLLECTION) are intercepted before collection
67/// resolution and dispatched directly through `Database::execute_query`.
68#[utoipa::path(
69    post,
70    path = "/query",
71    tag = "query",
72    request_body = QueryRequest,
73    responses(
74        (status = 200, description = "Query results", body = QueryResponse),
75        (status = 400, description = "Query syntax error", body = crate::types::QueryErrorResponse),
76        (status = 422, description = "Query validation/execution error", body = crate::types::VelesqlErrorResponse),
77        (status = 404, description = "Collection not found", body = crate::types::VelesqlErrorResponse)
78    )
79)]
80#[allow(clippy::unused_async)]
81pub async fn query(
82    State(state): State<Arc<AppState>>,
83    Json(req): Json<QueryRequest>,
84) -> impl IntoResponse {
85    let start = std::time::Instant::now();
86    state.operational_metrics.inc_queries();
87
88    let parsed = match parse_and_validate(&req.query) {
89        Ok(q) => q,
90        Err(resp) => {
91            state.operational_metrics.inc_errors();
92            return resp;
93        }
94    };
95
96    // DDL/Introspection/Admin/graph-mutation bypass: these extract collection from
97    // the SQL AST, not from the request body.  INSERT INTO, UPSERT, and UPDATE flow
98    // through the standard path because they return meaningful result rows.
99    if requires_mutation_dispatch(&parsed) {
100        return execute_mutation_query(&state, &parsed, &req.params, start);
101    }
102
103    let collection_name = match resolve_collection_name(&parsed, &req) {
104        Ok(name) => name,
105        Err(resp) => {
106            state.operational_metrics.inc_errors();
107            return resp;
108        }
109    };
110
111    // BUG-1 FIX: Detect aggregation queries and route to execute_aggregate
112    if is_aggregation_query(&parsed.select) {
113        return execute_aggregation_query(&state, &collection_name, &parsed, &req.params, start);
114    }
115
116    let results = match execute_standard_query(&state, &parsed, &collection_name, &req) {
117        Ok(r) => r,
118        Err(resp) => {
119            state.operational_metrics.inc_errors();
120            return resp;
121        }
122    };
123
124    build_query_response(
125        &state,
126        &collection_name,
127        start,
128        results,
129        &parsed.select.columns,
130    )
131}
132
133/// Extract the collection name targeted by a mutation/DDL/introspection query.
134///
135/// Returns `"_system"` for global operations that do not target a specific
136/// collection (SHOW COLLECTIONS, EXPLAIN, FLUSH without collection). This
137/// function is used exclusively for telemetry tagging — it is NOT on the
138/// HTTP routing path. The actual query execution goes through
139/// [`Database::execute_query`] which enforces its own dispatch rules.
140///
141/// # Forward compatibility
142///
143/// The four VelesQL statement enums (`IntrospectionStatement`,
144/// `AdminStatement`, `DdlStatement`, `DmlStatement`) are annotated
145/// `#[non_exhaustive]` so a wildcard `_` arm is mandatory here. To prevent
146/// silent telemetry degradation when a new core variant is added without
147/// an accompanying handler update, every wildcard arm now logs a
148/// `tracing::warn!` with a stable target so it is visible in CI logs and
149/// production observability.
150fn extract_mutation_collection_name(parsed: &Query) -> String {
151    extract_ddl_collection(parsed)
152        .or_else(|| extract_dml_collection(parsed))
153        .or_else(|| extract_introspection_collection(parsed))
154        .or_else(|| extract_admin_collection(parsed))
155        .or_else(|| parsed.train.as_ref().map(|train| train.collection.clone()))
156        .unwrap_or_else(|| "_system".to_string())
157}
158
159/// Extract collection name from an introspection statement.
160fn extract_introspection_collection(parsed: &Query) -> Option<String> {
161    parsed.introspection.as_ref().map(|intro| match intro {
162        IntrospectionStatement::DescribeCollection(d) => d.name.clone(),
163        IntrospectionStatement::ShowCollections | IntrospectionStatement::Explain(_) => {
164            "_system".to_string()
165        }
166        other => warn_unknown_velesql_variant("IntrospectionStatement", other),
167    })
168}
169
170/// Extract collection name from an admin statement.
171fn extract_admin_collection(parsed: &Query) -> Option<String> {
172    parsed.admin.as_ref().map(|admin| match admin {
173        velesdb_core::velesql::AdminStatement::Flush(f) => f
174            .collection
175            .clone()
176            .unwrap_or_else(|| "_system".to_string()),
177        other => warn_unknown_velesql_variant("AdminStatement", other),
178    })
179}
180
181/// Extract collection name from a DDL statement.
182fn extract_ddl_collection(parsed: &Query) -> Option<String> {
183    parsed.ddl.as_ref().and_then(|ddl| match ddl {
184        DdlStatement::CreateCollection(s) => Some(s.name.clone()),
185        DdlStatement::DropCollection(s) => Some(s.name.clone()),
186        DdlStatement::CreateIndex(s) => Some(s.collection.clone()),
187        DdlStatement::DropIndex(s) => Some(s.collection.clone()),
188        DdlStatement::Analyze(s) => Some(s.collection.clone()),
189        DdlStatement::Truncate(s) => Some(s.collection.clone()),
190        DdlStatement::AlterCollection(s) => Some(s.collection.clone()),
191        other => {
192            warn_unknown_velesql_variant("DdlStatement", other);
193            None
194        }
195    })
196}
197
198/// Extract collection name from a DML statement.
199fn extract_dml_collection(parsed: &Query) -> Option<String> {
200    parsed.dml.as_ref().and_then(|dml| match dml {
201        DmlStatement::Insert(s) | DmlStatement::Upsert(s) => Some(s.table.clone()),
202        DmlStatement::Update(s) => Some(s.table.clone()),
203        DmlStatement::InsertEdge(s) => Some(s.collection.clone()),
204        DmlStatement::Delete(s) => Some(s.table.clone()),
205        DmlStatement::DeleteEdge(s) => Some(s.collection.clone()),
206        DmlStatement::SelectEdges(s) => Some(s.collection.clone()),
207        DmlStatement::InsertNode(s) => Some(s.collection.clone()),
208        other => {
209            warn_unknown_velesql_variant("DmlStatement", other);
210            None
211        }
212    })
213}
214
215/// Logs a structured warning when an unknown VelesQL statement variant is
216/// encountered on the telemetry extraction path and returns the
217/// `"_system"` sentinel string. The structured `target` `velesql.dispatch`
218/// is stable so CI log aggregators and production observability can alert
219/// when a new core variant slips past the handler mapper without an
220/// accompanying update.
221///
222/// Note: this fallback only affects the collection-name tag attached to
223/// request telemetry. The HTTP response is still produced by the
224/// downstream `Database::execute_query` call which rejects truly
225/// unsupported statements with a proper error.
226fn warn_unknown_velesql_variant<T: std::fmt::Debug>(kind: &'static str, variant: &T) -> String {
227    tracing::warn!(
228        target: "velesql.dispatch",
229        enum_kind = kind,
230        variant = ?variant,
231        "unknown VelesQL statement variant on telemetry extraction path; \
232         routing collection tag to _system — add the new variant to \
233         extract_mutation_collection_name in handlers/query/mod.rs"
234    );
235    "_system".to_string()
236}
237
238/// Execute a DDL, graph/delete DML, introspection, admin, or TRAIN query.
239///
240/// DDL (CREATE/DROP/ALTER/ANALYZE/TRUNCATE), graph/delete DML mutations
241/// (INSERT EDGE, DELETE, DELETE EDGE, SELECT EDGES, INSERT NODE),
242/// introspection (SHOW/DESCRIBE/EXPLAIN), admin (FLUSH), and TRAIN
243/// statements extract collection names from the SQL AST — no FROM clause
244/// needed.
245///
246/// Results from `Database::execute_query` are propagated into the response
247/// so that introspection (SHOW, DESCRIBE, EXPLAIN), ANALYZE, and SELECT
248/// EDGES return their data to the caller.
249fn execute_mutation_query(
250    state: &Arc<AppState>,
251    parsed: &Query,
252    params: &std::collections::HashMap<String, serde_json::Value>,
253    start: std::time::Instant,
254) -> axum::response::Response {
255    match state.db.execute_query(parsed, params) {
256        Ok(results) => {
257            let coll_name = extract_mutation_collection_name(parsed);
258            build_query_response(state, &coll_name, start, results, &parsed.select.columns)
259        }
260        Err(e) => {
261            state.operational_metrics.inc_errors();
262            velesql_error(
263                StatusCode::UNPROCESSABLE_ENTITY,
264                "VELESQL_MUTATION_ERROR",
265                &e.to_string(),
266                "Check collection name, statement syntax, and target existence",
267                None,
268            )
269        }
270    }
271}
272
273/// Determine the target collection from the parsed query and request body.
274#[allow(clippy::result_large_err)]
275fn resolve_collection_name(
276    parsed: &Query,
277    req: &QueryRequest,
278) -> Result<String, axum::response::Response> {
279    if parsed.is_match_query() {
280        req.collection
281            .as_ref()
282            .filter(|name| !name.is_empty())
283            .cloned()
284            .ok_or_else(|| {
285                velesql_error(
286                    StatusCode::UNPROCESSABLE_ENTITY,
287                    "VELESQL_MISSING_COLLECTION",
288                    "MATCH query via /query requires `collection` in request body",
289                    "Add `collection` to the /query JSON body or use /collections/{name}/match",
290                    Some(serde_json::json!({
291                        "field": "collection",
292                        "endpoint": "/query",
293                        "query_type": "MATCH"
294                    })),
295                )
296            })
297    } else {
298        Ok(parsed.select.from.clone())
299    }
300}
301
302/// Execute a standard (non-aggregation) query, dispatching MATCH vs SELECT.
303#[allow(clippy::result_large_err)]
304fn execute_standard_query(
305    state: &Arc<AppState>,
306    parsed: &Query,
307    collection_name: &str,
308    req: &QueryRequest,
309) -> Result<Vec<velesdb_core::SearchResult>, axum::response::Response> {
310    let execute_result = if parsed.is_match_query() {
311        let mut params = req.params.clone();
312        params
313            .entry("_collection".to_string())
314            .or_insert_with(|| serde_json::json!(collection_name));
315        state.db.execute_query(parsed, &params)
316    } else {
317        state.db.execute_query(parsed, &req.params)
318    };
319
320    execute_result.map_err(|e| match e {
321        velesdb_core::Error::CollectionNotFound(name) => velesql_collection_not_found(&name),
322        other => velesql_error(
323            StatusCode::UNPROCESSABLE_ENTITY,
324            "VELESQL_EXECUTION_ERROR",
325            &other.to_string(),
326            "Validate query semantics and parameter types against the target collection",
327            None,
328        ),
329    })
330}
331
332/// Build the final query response with timing metrics and SQL projection.
333fn build_query_response(
334    state: &Arc<AppState>,
335    collection_name: &str,
336    start: std::time::Instant,
337    results: Vec<velesdb_core::SearchResult>,
338    select_columns: &SelectColumns,
339) -> axum::response::Response {
340    let elapsed = start.elapsed();
341    let timing_ms = elapsed.as_secs_f64() * 1000.0;
342    #[allow(clippy::cast_possible_truncation)]
343    // Reason: timing_ms is always < u64::MAX (query durations < 585 millennia)
344    let took_ms = timing_ms.round() as u64;
345    notify_query_timing(state, collection_name, start);
346    state
347        .query_duration_histogram
348        .observe(elapsed.as_secs_f64());
349    let projected = projection::project_results(&results, select_columns);
350    let rows_returned = projected.len();
351
352    Json(QueryResponse {
353        results: projected,
354        timing_ms,
355        took_ms,
356        rows_returned,
357        meta: QueryResponseMeta {
358            velesql_contract_version: VELESQL_CONTRACT_VERSION.to_string(),
359            count: rows_returned,
360        },
361    })
362    .into_response()
363}
364
365/// Detect query type from parsed AST (EPIC-052 US-006).
366///
367/// Priority order:
368/// 1. DDL (CREATE/DROP COLLECTION) -> Ddl
369/// 2. DML (INSERT/UPDATE/DELETE) -> Dml
370/// 3. MATCH clause -> Graph
371/// 4. GROUP BY or aggregates -> Aggregation
372/// 5. Vector search -> Search
373/// 6. Default -> Rows
374#[allow(dead_code)] // Used in tests, will be used in unified handler
375pub fn detect_query_type(query: &Query) -> QueryType {
376    if query.is_ddl_query() {
377        return QueryType::Ddl;
378    }
379
380    if query.is_dml_query() {
381        return QueryType::Dml;
382    }
383
384    if query.is_match_query() {
385        return QueryType::Graph;
386    }
387
388    if is_aggregation_query(&query.select) {
389        return QueryType::Aggregation;
390    }
391
392    let has_vector = query
393        .select
394        .where_clause
395        .as_ref()
396        .map(condition_has_vector_search)
397        .unwrap_or(false);
398
399    if has_vector {
400        return QueryType::Search;
401    }
402
403    QueryType::Rows
404}
405
406#[cfg(test)]
407mod tests {
408    use super::*;
409
410    #[test]
411    fn test_detect_query_type_search() {
412        let parsed = velesql::Parser::parse(
413            "SELECT * FROM docs WHERE similarity(embedding, $v) > 0.8 LIMIT 10",
414        )
415        .unwrap();
416        assert_eq!(detect_query_type(&parsed), QueryType::Search);
417    }
418
419    #[test]
420    fn test_detect_query_type_aggregation() {
421        let parsed =
422            velesql::Parser::parse("SELECT category, COUNT(*) FROM products GROUP BY category")
423                .unwrap();
424        assert_eq!(detect_query_type(&parsed), QueryType::Aggregation);
425    }
426
427    #[test]
428    fn test_detect_query_type_rows() {
429        let parsed =
430            velesql::Parser::parse("SELECT name, price FROM products WHERE price > 100").unwrap();
431        assert_eq!(detect_query_type(&parsed), QueryType::Rows);
432    }
433
434    #[test]
435    fn test_detect_query_type_graph() {
436        let parsed =
437            velesql::Parser::parse("MATCH (n:Person)-[:KNOWS]->(m) RETURN n.name, m.name LIMIT 10")
438                .unwrap();
439        assert_eq!(detect_query_type(&parsed), QueryType::Graph);
440    }
441
442    #[test]
443    fn test_detect_query_type_hybrid_vector_aggregation() {
444        // When both vector search and aggregation, aggregation takes priority
445        let parsed = velesql::Parser::parse(
446            "SELECT category, COUNT(*) FROM docs WHERE similarity(embedding, $v) > 0.7 GROUP BY category",
447        )
448        .unwrap();
449        assert_eq!(detect_query_type(&parsed), QueryType::Aggregation);
450    }
451
452    #[test]
453    fn test_detect_query_type_ddl_create() {
454        let parsed =
455            velesql::Parser::parse("CREATE COLLECTION docs (dimension = 768, metric = 'cosine');")
456                .unwrap();
457        assert_eq!(detect_query_type(&parsed), QueryType::Ddl);
458    }
459
460    #[test]
461    fn test_detect_query_type_ddl_drop() {
462        let parsed = velesql::Parser::parse("DROP COLLECTION docs;").unwrap();
463        assert_eq!(detect_query_type(&parsed), QueryType::Ddl);
464    }
465
466    #[test]
467    fn test_detect_query_type_dml_insert_edge() {
468        let parsed = velesql::Parser::parse(
469            "INSERT EDGE INTO kg (source = 1, target = 2, label = 'KNOWS');",
470        )
471        .unwrap();
472        assert_eq!(detect_query_type(&parsed), QueryType::Dml);
473    }
474
475    #[test]
476    fn test_detect_query_type_dml_delete() {
477        let parsed = velesql::Parser::parse("DELETE FROM docs WHERE id = 1;").unwrap();
478        assert_eq!(detect_query_type(&parsed), QueryType::Dml);
479    }
480}