Skip to main content

velesdb_server/handlers/query/
aggregation.rs

1//! Aggregation query dispatch and execution.
2//!
3//! Handles detection and execution of GROUP BY / aggregate function queries,
4//! routing them to `execute_aggregate` on the appropriate collection.
5
6use axum::{http::StatusCode, response::IntoResponse, Json};
7use std::sync::Arc;
8use velesdb_core::velesql::{Query, SelectColumns};
9
10use crate::handlers::helpers::notify_query_timing;
11use crate::types::{
12    AggregationResponse, QueryRequest, QueryResponseMeta, VELESQL_CONTRACT_VERSION,
13};
14use crate::AppState;
15
16use super::velesql_helpers::{parse_and_validate, velesql_collection_not_found, velesql_error};
17
18/// Returns `true` if the query contains aggregation functions or GROUP BY,
19/// but NOT if it's a vector-search GROUP BY (which is handled as post-processing
20/// in `execute_query`, not `execute_aggregate`).
21pub(crate) fn is_aggregation_query(select: &velesdb_core::velesql::SelectStatement) -> bool {
22    let has_aggs = match &select.columns {
23        SelectColumns::Aggregations(_) => true,
24        SelectColumns::Mixed { aggregations, .. } => !aggregations.is_empty(),
25        _ => false,
26    };
27    let is_agg_query = has_aggs || select.group_by.is_some();
28
29    // Vector-search GROUP BY (NEAR + GROUP BY) is handled as post-processing
30    // inside execute_query, not execute_aggregate. Route it to the standard path.
31    if is_agg_query {
32        let has_vector_near = select
33            .where_clause
34            .as_ref()
35            .is_some_and(velesdb_core::velesql::Condition::has_vector_search);
36        if has_vector_near && select.group_by.is_some() {
37            return false;
38        }
39    }
40
41    is_agg_query
42}
43
44fn aggregation_result_count(result: &serde_json::Value) -> usize {
45    match result {
46        serde_json::Value::Array(rows) => rows.len(),
47        serde_json::Value::Object(_) => 1,
48        _ => 0,
49    }
50}
51
52pub(crate) fn execute_aggregation_query(
53    state: &Arc<AppState>,
54    collection_name: &str,
55    parsed: &Query,
56    params: &std::collections::HashMap<String, serde_json::Value>,
57    start: std::time::Instant,
58) -> axum::response::Response {
59    // Prefer typed vector collection for aggregation.
60    let result = if let Some(vc) = state.db.get_vector_collection(collection_name) {
61        vc.execute_aggregate(parsed, params)
62    } else if let Some(any) = state.db.get_any_collection(collection_name) {
63        any.execute_aggregate(parsed, params)
64    } else {
65        state.operational_metrics.inc_errors();
66        return velesql_collection_not_found(collection_name);
67    };
68
69    let result = match result {
70        Ok(r) => r,
71        Err(e) => {
72            state.operational_metrics.inc_errors();
73            return velesql_error(
74                StatusCode::UNPROCESSABLE_ENTITY,
75                "VELESQL_AGGREGATION_ERROR",
76                &e.to_string(),
77                "Verify GROUP BY/HAVING clauses and aggregate function arguments",
78                None,
79            );
80        }
81    };
82
83    let elapsed = start.elapsed();
84    let timing_ms = elapsed.as_secs_f64() * 1000.0;
85    notify_query_timing(state, collection_name, start);
86    state
87        .query_duration_histogram
88        .observe(elapsed.as_secs_f64());
89    let count = aggregation_result_count(&result);
90
91    Json(AggregationResponse {
92        result,
93        timing_ms,
94        meta: QueryResponseMeta {
95            velesql_contract_version: VELESQL_CONTRACT_VERSION.to_string(),
96            count,
97        },
98    })
99    .into_response()
100}
101
102/// Resolve the collection name for an aggregation query.
103#[allow(clippy::result_large_err)]
104pub(crate) fn resolve_aggregate_collection(
105    parsed: &Query,
106    req: &QueryRequest,
107) -> Result<String, axum::response::Response> {
108    if !parsed.select.from.is_empty() {
109        return Ok(parsed.select.from.clone());
110    }
111    req.collection
112        .as_ref()
113        .filter(|name| !name.is_empty())
114        .cloned()
115        .ok_or_else(|| {
116            velesql_error(
117                StatusCode::UNPROCESSABLE_ENTITY,
118                "VELESQL_MISSING_COLLECTION",
119                "Aggregation query requires a FROM collection or request-body `collection`",
120                "Add FROM <collection> to query or set `collection` in request JSON",
121                Some(serde_json::json!({
122                    "field": "collection",
123                    "endpoint": "/aggregate"
124                })),
125            )
126        })
127}
128
129/// Execute an aggregation-only VelesQL query.
130///
131/// This endpoint is explicit and stable for GROUP BY / HAVING / aggregate workloads.
132#[utoipa::path(
133    post,
134    path = "/aggregate",
135    tag = "query",
136    request_body = QueryRequest,
137    responses(
138        (status = 200, description = "Aggregation results", body = AggregationResponse),
139        (status = 400, description = "Query syntax error", body = crate::types::QueryErrorResponse),
140        (status = 422, description = "Aggregation validation/execution error", body = crate::types::VelesqlErrorResponse),
141        (status = 404, description = "Collection not found", body = crate::types::VelesqlErrorResponse)
142    )
143)]
144#[allow(clippy::unused_async)]
145pub async fn aggregate(
146    axum::extract::State(state): axum::extract::State<Arc<AppState>>,
147    Json(req): Json<QueryRequest>,
148) -> impl IntoResponse {
149    let start = std::time::Instant::now();
150    state.operational_metrics.inc_queries();
151
152    let parsed = match parse_and_validate(&req.query) {
153        Ok(q) => q,
154        Err(resp) => {
155            state.operational_metrics.inc_errors();
156            return resp;
157        }
158    };
159
160    if parsed.is_match_query() || !is_aggregation_query(&parsed.select) {
161        state.operational_metrics.inc_errors();
162        return velesql_error(
163            StatusCode::UNPROCESSABLE_ENTITY,
164            "VELESQL_AGGREGATION_ERROR",
165            "Only aggregation queries are accepted on /aggregate",
166            "Use /query for row/search/graph queries; use /aggregate for GROUP BY/aggregate workloads.",
167            Some(serde_json::json!({ "endpoint": "/aggregate" })),
168        );
169    }
170
171    let collection_name = resolve_aggregate_collection(&parsed, &req);
172    let collection_name = match collection_name {
173        Ok(name) => name,
174        Err(resp) => {
175            state.operational_metrics.inc_errors();
176            return resp;
177        }
178    };
179
180    execute_aggregation_query(&state, &collection_name, &parsed, &req.params, start)
181}