velesdb_server/handlers/query/
aggregation.rs1use axum::{http::StatusCode, response::IntoResponse, Json};
7use std::sync::Arc;
8use velesdb_core::velesql::{Query, SelectColumns};
9
10use crate::handlers::helpers::notify_query_timing;
11use crate::types::{
12 AggregationResponse, QueryRequest, QueryResponseMeta, VELESQL_CONTRACT_VERSION,
13};
14use crate::AppState;
15
16use super::velesql_helpers::{parse_and_validate, velesql_collection_not_found, velesql_error};
17
18pub(crate) fn is_aggregation_query(select: &velesdb_core::velesql::SelectStatement) -> bool {
22 let has_aggs = match &select.columns {
23 SelectColumns::Aggregations(_) => true,
24 SelectColumns::Mixed { aggregations, .. } => !aggregations.is_empty(),
25 _ => false,
26 };
27 let is_agg_query = has_aggs || select.group_by.is_some();
28
29 if is_agg_query {
32 let has_vector_near = select
33 .where_clause
34 .as_ref()
35 .is_some_and(velesdb_core::velesql::Condition::has_vector_search);
36 if has_vector_near && select.group_by.is_some() {
37 return false;
38 }
39 }
40
41 is_agg_query
42}
43
44fn aggregation_result_count(result: &serde_json::Value) -> usize {
45 match result {
46 serde_json::Value::Array(rows) => rows.len(),
47 serde_json::Value::Object(_) => 1,
48 _ => 0,
49 }
50}
51
52pub(crate) fn execute_aggregation_query(
53 state: &Arc<AppState>,
54 collection_name: &str,
55 parsed: &Query,
56 params: &std::collections::HashMap<String, serde_json::Value>,
57 start: std::time::Instant,
58) -> axum::response::Response {
59 let result = if let Some(vc) = state.db.get_vector_collection(collection_name) {
61 vc.execute_aggregate(parsed, params)
62 } else if let Some(any) = state.db.get_any_collection(collection_name) {
63 any.execute_aggregate(parsed, params)
64 } else {
65 state.operational_metrics.inc_errors();
66 return velesql_collection_not_found(collection_name);
67 };
68
69 let result = match result {
70 Ok(r) => r,
71 Err(e) => {
72 state.operational_metrics.inc_errors();
73 return velesql_error(
74 StatusCode::UNPROCESSABLE_ENTITY,
75 "VELESQL_AGGREGATION_ERROR",
76 &e.to_string(),
77 "Verify GROUP BY/HAVING clauses and aggregate function arguments",
78 None,
79 );
80 }
81 };
82
83 let elapsed = start.elapsed();
84 let timing_ms = elapsed.as_secs_f64() * 1000.0;
85 notify_query_timing(state, collection_name, start);
86 state
87 .query_duration_histogram
88 .observe(elapsed.as_secs_f64());
89 let count = aggregation_result_count(&result);
90
91 Json(AggregationResponse {
92 result,
93 timing_ms,
94 meta: QueryResponseMeta {
95 velesql_contract_version: VELESQL_CONTRACT_VERSION.to_string(),
96 count,
97 },
98 })
99 .into_response()
100}
101
102#[allow(clippy::result_large_err)]
104pub(crate) fn resolve_aggregate_collection(
105 parsed: &Query,
106 req: &QueryRequest,
107) -> Result<String, axum::response::Response> {
108 if !parsed.select.from.is_empty() {
109 return Ok(parsed.select.from.clone());
110 }
111 req.collection
112 .as_ref()
113 .filter(|name| !name.is_empty())
114 .cloned()
115 .ok_or_else(|| {
116 velesql_error(
117 StatusCode::UNPROCESSABLE_ENTITY,
118 "VELESQL_MISSING_COLLECTION",
119 "Aggregation query requires a FROM collection or request-body `collection`",
120 "Add FROM <collection> to query or set `collection` in request JSON",
121 Some(serde_json::json!({
122 "field": "collection",
123 "endpoint": "/aggregate"
124 })),
125 )
126 })
127}
128
129#[utoipa::path(
133 post,
134 path = "/aggregate",
135 tag = "query",
136 request_body = QueryRequest,
137 responses(
138 (status = 200, description = "Aggregation results", body = AggregationResponse),
139 (status = 400, description = "Query syntax error", body = crate::types::QueryErrorResponse),
140 (status = 422, description = "Aggregation validation/execution error", body = crate::types::VelesqlErrorResponse),
141 (status = 404, description = "Collection not found", body = crate::types::VelesqlErrorResponse)
142 )
143)]
144#[allow(clippy::unused_async)]
145pub async fn aggregate(
146 axum::extract::State(state): axum::extract::State<Arc<AppState>>,
147 Json(req): Json<QueryRequest>,
148) -> impl IntoResponse {
149 let start = std::time::Instant::now();
150 state.operational_metrics.inc_queries();
151
152 let parsed = match parse_and_validate(&req.query) {
153 Ok(q) => q,
154 Err(resp) => {
155 state.operational_metrics.inc_errors();
156 return resp;
157 }
158 };
159
160 if parsed.is_match_query() || !is_aggregation_query(&parsed.select) {
161 state.operational_metrics.inc_errors();
162 return velesql_error(
163 StatusCode::UNPROCESSABLE_ENTITY,
164 "VELESQL_AGGREGATION_ERROR",
165 "Only aggregation queries are accepted on /aggregate",
166 "Use /query for row/search/graph queries; use /aggregate for GROUP BY/aggregate workloads.",
167 Some(serde_json::json!({ "endpoint": "/aggregate" })),
168 );
169 }
170
171 let collection_name = resolve_aggregate_collection(&parsed, &req);
172 let collection_name = match collection_name {
173 Ok(name) => name,
174 Err(resp) => {
175 state.operational_metrics.inc_errors();
176 return resp;
177 }
178 };
179
180 execute_aggregation_query(&state, &collection_name, &parsed, &req.params, start)
181}