fraiseql_core/runtime/executor/aggregate.rs
1//! Aggregate and window query execution.
2
3use super::Executor;
4use crate::{
5 db::traits::DatabaseAdapter,
6 error::{FraiseQLError, Result},
7 runtime::suggest_similar,
8};
9
10impl<A: DatabaseAdapter> Executor<A> {
11 /// Execute an aggregate query dispatch.
12 ///
13 /// # Errors
14 ///
15 /// * [`FraiseQLError::Validation`] — the query name does not end with `_aggregate`, or the
16 /// derived fact table is not found in the compiled schema.
17 /// * Propagates errors from [`execute_aggregate_query`](Self::execute_aggregate_query).
18 pub(super) async fn execute_aggregate_dispatch(
19 &self,
20 query_name: &str,
21 variables: Option<&serde_json::Value>,
22 ) -> Result<serde_json::Value> {
23 // Extract table name from query name (e.g., "sales_aggregate" -> "tf_sales")
24 let table_name =
25 query_name.strip_suffix("_aggregate").ok_or_else(|| FraiseQLError::Validation {
26 message: format!("Invalid aggregate query name: {}", query_name),
27 path: None,
28 })?;
29
30 let fact_table_name = format!("tf_{}", table_name);
31
32 // Get fact table metadata from schema
33 let metadata = self.schema.get_fact_table(&fact_table_name).ok_or_else(|| {
34 let known: Vec<&str> = self.schema.list_fact_tables();
35 let suggestion = suggest_similar(&fact_table_name, &known);
36 let base = format!("Fact table '{}' not found in schema", fact_table_name);
37 let message = match suggestion.as_slice() {
38 [s] => format!("{base}. Did you mean '{s}'?"),
39 _ => base,
40 };
41 FraiseQLError::Validation {
42 message,
43 path: Some(format!("fact_tables.{}", fact_table_name)),
44 }
45 })?;
46
47 // Parse query variables into aggregate query JSON
48 let empty_json = serde_json::json!({});
49 let query_json = variables.unwrap_or(&empty_json);
50
51 // Execute aggregate query
52 self.execute_aggregate_query(query_json, query_name, metadata).await
53 }
54
55 /// Execute a window query dispatch.
56 ///
57 /// # Errors
58 ///
59 /// * [`FraiseQLError::Validation`] — the query name does not end with `_window`, or the derived
60 /// fact table is not found in the compiled schema.
61 /// * Propagates errors from [`execute_window_query`](Self::execute_window_query).
62 pub(super) async fn execute_window_dispatch(
63 &self,
64 query_name: &str,
65 variables: Option<&serde_json::Value>,
66 ) -> Result<serde_json::Value> {
67 // Extract table name from query name (e.g., "sales_window" -> "tf_sales")
68 let table_name =
69 query_name.strip_suffix("_window").ok_or_else(|| FraiseQLError::Validation {
70 message: format!("Invalid window query name: {}", query_name),
71 path: None,
72 })?;
73
74 let fact_table_name = format!("tf_{}", table_name);
75
76 // Get fact table metadata from schema
77 let metadata = self.schema.get_fact_table(&fact_table_name).ok_or_else(|| {
78 let known: Vec<&str> = self.schema.list_fact_tables();
79 let suggestion = suggest_similar(&fact_table_name, &known);
80 let base = format!("Fact table '{}' not found in schema", fact_table_name);
81 let message = match suggestion.as_slice() {
82 [s] => format!("{base}. Did you mean '{s}'?"),
83 _ => base,
84 };
85 FraiseQLError::Validation {
86 message,
87 path: Some(format!("fact_tables.{}", fact_table_name)),
88 }
89 })?;
90
91 // Parse query variables into window query JSON
92 let empty_json = serde_json::json!({});
93 let query_json = variables.unwrap_or(&empty_json);
94
95 // Execute window query
96 self.execute_window_query(query_json, query_name, metadata).await
97 }
98
99 /// Execute an aggregate query.
100 ///
101 /// # Arguments
102 ///
103 /// * `query_json` - JSON representation of the aggregate query
104 /// * `query_name` - GraphQL field name (e.g., "`sales_aggregate`")
105 /// * `metadata` - Fact table metadata
106 ///
107 /// # Returns
108 ///
109 /// GraphQL response as JSON string
110 ///
111 /// # Errors
112 ///
113 /// Returns error if:
114 /// - Query parsing fails
115 /// - Execution plan generation fails
116 /// - SQL generation fails
117 /// - Database execution fails
118 /// - Result projection fails
119 ///
120 /// # Example
121 ///
122 /// ```no_run
123 /// // Requires: a live database adapter and compiled fact table metadata.
124 /// // See: tests/integration/ for runnable examples.
125 /// # use serde_json::json;
126 /// let query_json = json!({
127 /// "table": "tf_sales",
128 /// "groupBy": { "category": true },
129 /// "aggregates": [{"count": {}}]
130 /// });
131 /// // let result = executor.execute_aggregate_query(&query_json, "sales_aggregate", &metadata).await?;
132 /// ```
133 pub async fn execute_aggregate_query(
134 &self,
135 query_json: &serde_json::Value,
136 query_name: &str,
137 metadata: &crate::compiler::fact_table::FactTableMetadata,
138 ) -> Result<serde_json::Value> {
139 // 1. Parse JSON query into AggregationRequest Build native_columns from
140 // denormalized_filters so the parser can emit direct column references instead of JSONB
141 // extraction for native columns.
142 let native_columns = crate::runtime::native_columns::filter_columns_to_native_map(
143 &metadata.denormalized_filters,
144 );
145 let request =
146 super::super::AggregateQueryParser::parse(query_json, metadata, &native_columns)?;
147
148 // 2. Generate execution plan
149 let plan =
150 crate::compiler::aggregation::AggregationPlanner::plan(request, metadata.clone())?;
151
152 // 3. Generate parameterized SQL
153 let sql_generator =
154 super::super::AggregationSqlGenerator::new(self.adapter.database_type());
155 let parameterized = sql_generator.generate_parameterized(&plan)?;
156
157 // 4. Execute with bind parameters (eliminates escape-based injection risk)
158 let rows = self
159 .adapter
160 .execute_parameterized_aggregate(¶meterized.sql, ¶meterized.params)
161 .await?;
162
163 // 5. Project results
164 let projected = super::super::AggregationProjector::project(rows, &plan)?;
165
166 // 6. Wrap in GraphQL data envelope
167 let response =
168 super::super::AggregationProjector::wrap_in_data_envelope(projected, query_name);
169
170 // 7. Serialize to JSON string
171 Ok(response)
172 }
173
174 /// Execute a window query.
175 ///
176 /// # Arguments
177 ///
178 /// * `query_json` - JSON representation of the window query
179 /// * `query_name` - GraphQL field name (e.g., "`sales_window`")
180 /// * `metadata` - Fact table metadata
181 ///
182 /// # Returns
183 ///
184 /// GraphQL response as JSON string
185 ///
186 /// # Errors
187 ///
188 /// Returns error if:
189 /// - Query parsing fails
190 /// - Execution plan generation fails
191 /// - SQL generation fails
192 /// - Database execution fails
193 /// - Result projection fails
194 ///
195 /// # Example
196 ///
197 /// ```no_run
198 /// // Requires: a live database adapter and compiled fact table metadata.
199 /// // See: tests/integration/ for runnable examples.
200 /// # use serde_json::json;
201 /// let query_json = json!({
202 /// "table": "tf_sales",
203 /// "select": [{"type": "measure", "name": "revenue", "alias": "revenue"}],
204 /// "windows": [{
205 /// "function": {"type": "row_number"},
206 /// "alias": "rank",
207 /// "partitionBy": [{"type": "dimension", "path": "category"}],
208 /// "orderBy": [{"field": "revenue", "direction": "DESC"}]
209 /// }]
210 /// });
211 /// // let result = executor.execute_window_query(&query_json, "sales_window", &metadata).await?;
212 /// ```
213 pub async fn execute_window_query(
214 &self,
215 query_json: &serde_json::Value,
216 query_name: &str,
217 metadata: &crate::compiler::fact_table::FactTableMetadata,
218 ) -> Result<serde_json::Value> {
219 // 1. Parse JSON query into WindowRequest
220 let request = super::super::WindowQueryParser::parse(query_json, metadata)?;
221
222 // 2. Generate execution plan (validates semantic names against metadata)
223 let plan = crate::compiler::window_functions::WindowPlanner::plan(request, metadata)?;
224
225 // 3. Generate SQL
226 let sql_generator = super::super::WindowSqlGenerator::new(self.adapter.database_type());
227 let sql = sql_generator.generate(&plan)?;
228
229 // 4. Execute SQL — bind parameters via execute_parameterized_aggregate so WHERE clause
230 // values are passed as prepared-statement parameters, not inlined.
231 let rows = self
232 .adapter
233 .execute_parameterized_aggregate(&sql.raw_sql, &sql.parameters)
234 .await?;
235
236 // 5. Project results
237 let projected = super::super::WindowProjector::project(rows, &plan)?;
238
239 // 6. Wrap in GraphQL data envelope
240 let response = super::super::WindowProjector::wrap_in_data_envelope(projected, query_name);
241
242 // 7. Serialize to JSON string
243 Ok(response)
244 }
245}