fraiseql_core/runtime/executor/aggregate.rs
1//! Aggregate and window query execution.
2
3use super::Executor;
4use crate::{
5 db::traits::DatabaseAdapter,
6 error::{FraiseQLError, Result},
7 runtime::suggest_similar,
8};
9
10impl<A: DatabaseAdapter> Executor<A> {
11 /// Execute an aggregate query dispatch.
12 ///
13 /// # Errors
14 ///
15 /// * [`FraiseQLError::Validation`] — the query name does not end with `_aggregate`, or the
16 /// derived fact table is not found in the compiled schema.
17 /// * Propagates errors from [`execute_aggregate_query`](Self::execute_aggregate_query).
18 pub(super) async fn execute_aggregate_dispatch(
19 &self,
20 query_name: &str,
21 variables: Option<&serde_json::Value>,
22 ) -> Result<String> {
23 // Extract table name from query name (e.g., "sales_aggregate" -> "tf_sales")
24 let table_name =
25 query_name.strip_suffix("_aggregate").ok_or_else(|| FraiseQLError::Validation {
26 message: format!("Invalid aggregate query name: {}", query_name),
27 path: None,
28 })?;
29
30 let fact_table_name = format!("tf_{}", table_name);
31
32 // Get fact table metadata from schema
33 let metadata = self.schema.get_fact_table(&fact_table_name).ok_or_else(|| {
34 let known: Vec<&str> = self.schema.list_fact_tables();
35 let suggestion = suggest_similar(&fact_table_name, &known);
36 let base = format!("Fact table '{}' not found in schema", fact_table_name);
37 let message = match suggestion.as_slice() {
38 [s] => format!("{base}. Did you mean '{s}'?"),
39 _ => base,
40 };
41 FraiseQLError::Validation {
42 message,
43 path: Some(format!("fact_tables.{}", fact_table_name)),
44 }
45 })?;
46
47 // Parse query variables into aggregate query JSON
48 let empty_json = serde_json::json!({});
49 let query_json = variables.unwrap_or(&empty_json);
50
51 // Execute aggregate query
52 self.execute_aggregate_query(query_json, query_name, metadata).await
53 }
54
55 /// Execute a window query dispatch.
56 ///
57 /// # Errors
58 ///
59 /// * [`FraiseQLError::Validation`] — the query name does not end with `_window`, or the derived
60 /// fact table is not found in the compiled schema.
61 /// * Propagates errors from [`execute_window_query`](Self::execute_window_query).
62 pub(super) async fn execute_window_dispatch(
63 &self,
64 query_name: &str,
65 variables: Option<&serde_json::Value>,
66 ) -> Result<String> {
67 // Extract table name from query name (e.g., "sales_window" -> "tf_sales")
68 let table_name =
69 query_name.strip_suffix("_window").ok_or_else(|| FraiseQLError::Validation {
70 message: format!("Invalid window query name: {}", query_name),
71 path: None,
72 })?;
73
74 let fact_table_name = format!("tf_{}", table_name);
75
76 // Get fact table metadata from schema
77 let metadata = self.schema.get_fact_table(&fact_table_name).ok_or_else(|| {
78 let known: Vec<&str> = self.schema.list_fact_tables();
79 let suggestion = suggest_similar(&fact_table_name, &known);
80 let base = format!("Fact table '{}' not found in schema", fact_table_name);
81 let message = match suggestion.as_slice() {
82 [s] => format!("{base}. Did you mean '{s}'?"),
83 _ => base,
84 };
85 FraiseQLError::Validation {
86 message,
87 path: Some(format!("fact_tables.{}", fact_table_name)),
88 }
89 })?;
90
91 // Parse query variables into window query JSON
92 let empty_json = serde_json::json!({});
93 let query_json = variables.unwrap_or(&empty_json);
94
95 // Execute window query
96 self.execute_window_query(query_json, query_name, metadata).await
97 }
98
99 /// Execute an aggregate query.
100 ///
101 /// # Arguments
102 ///
103 /// * `query_json` - JSON representation of the aggregate query
104 /// * `query_name` - GraphQL field name (e.g., "`sales_aggregate`")
105 /// * `metadata` - Fact table metadata
106 ///
107 /// # Returns
108 ///
109 /// GraphQL response as JSON string
110 ///
111 /// # Errors
112 ///
113 /// Returns error if:
114 /// - Query parsing fails
115 /// - Execution plan generation fails
116 /// - SQL generation fails
117 /// - Database execution fails
118 /// - Result projection fails
119 ///
120 /// # Example
121 ///
122 /// ```no_run
123 /// // Requires: a live database adapter and compiled fact table metadata.
124 /// // See: tests/integration/ for runnable examples.
125 /// # use serde_json::json;
126 /// let query_json = json!({
127 /// "table": "tf_sales",
128 /// "groupBy": { "category": true },
129 /// "aggregates": [{"count": {}}]
130 /// });
131 /// // let result = executor.execute_aggregate_query(&query_json, "sales_aggregate", &metadata).await?;
132 /// ```
133 pub async fn execute_aggregate_query(
134 &self,
135 query_json: &serde_json::Value,
136 query_name: &str,
137 metadata: &crate::compiler::fact_table::FactTableMetadata,
138 ) -> Result<String> {
139 // 1. Parse JSON query into AggregationRequest
140 let request = super::super::AggregateQueryParser::parse(query_json, metadata)?;
141
142 // 2. Generate execution plan
143 let plan =
144 crate::compiler::aggregation::AggregationPlanner::plan(request, metadata.clone())?;
145
146 // 3. Generate parameterized SQL
147 let sql_generator =
148 super::super::AggregationSqlGenerator::new(self.adapter.database_type());
149 let parameterized = sql_generator.generate_parameterized(&plan)?;
150
151 // 4. Execute with bind parameters (eliminates escape-based injection risk)
152 let rows = self
153 .adapter
154 .execute_parameterized_aggregate(¶meterized.sql, ¶meterized.params)
155 .await?;
156
157 // 5. Project results
158 let projected = super::super::AggregationProjector::project(rows, &plan)?;
159
160 // 6. Wrap in GraphQL data envelope
161 let response =
162 super::super::AggregationProjector::wrap_in_data_envelope(projected, query_name);
163
164 // 7. Serialize to JSON string
165 Ok(serde_json::to_string(&response)?)
166 }
167
168 /// Execute a window query.
169 ///
170 /// # Arguments
171 ///
172 /// * `query_json` - JSON representation of the window query
173 /// * `query_name` - GraphQL field name (e.g., "`sales_window`")
174 /// * `metadata` - Fact table metadata
175 ///
176 /// # Returns
177 ///
178 /// GraphQL response as JSON string
179 ///
180 /// # Errors
181 ///
182 /// Returns error if:
183 /// - Query parsing fails
184 /// - Execution plan generation fails
185 /// - SQL generation fails
186 /// - Database execution fails
187 /// - Result projection fails
188 ///
189 /// # Example
190 ///
191 /// ```no_run
192 /// // Requires: a live database adapter and compiled fact table metadata.
193 /// // See: tests/integration/ for runnable examples.
194 /// # use serde_json::json;
195 /// let query_json = json!({
196 /// "table": "tf_sales",
197 /// "select": [{"type": "measure", "name": "revenue", "alias": "revenue"}],
198 /// "windows": [{
199 /// "function": {"type": "row_number"},
200 /// "alias": "rank",
201 /// "partitionBy": [{"type": "dimension", "path": "category"}],
202 /// "orderBy": [{"field": "revenue", "direction": "DESC"}]
203 /// }]
204 /// });
205 /// // let result = executor.execute_window_query(&query_json, "sales_window", &metadata).await?;
206 /// ```
207 pub async fn execute_window_query(
208 &self,
209 query_json: &serde_json::Value,
210 query_name: &str,
211 metadata: &crate::compiler::fact_table::FactTableMetadata,
212 ) -> Result<String> {
213 // 1. Parse JSON query into WindowRequest
214 let request = super::super::WindowQueryParser::parse(query_json, metadata)?;
215
216 // 2. Generate execution plan (validates semantic names against metadata)
217 let plan = crate::compiler::window_functions::WindowPlanner::plan(request, metadata)?;
218
219 // 3. Generate SQL
220 let sql_generator = super::super::WindowSqlGenerator::new(self.adapter.database_type());
221 let sql = sql_generator.generate(&plan)?;
222
223 // 4. Execute SQL — bind parameters via execute_parameterized_aggregate so WHERE clause
224 // values are passed as prepared-statement parameters, not inlined.
225 let rows = self
226 .adapter
227 .execute_parameterized_aggregate(&sql.raw_sql, &sql.parameters)
228 .await?;
229
230 // 5. Project results
231 let projected = super::super::WindowProjector::project(rows, &plan)?;
232
233 // 6. Wrap in GraphQL data envelope
234 let response = super::super::WindowProjector::wrap_in_data_envelope(projected, query_name);
235
236 // 7. Serialize to JSON string
237 Ok(serde_json::to_string(&response)?)
238 }
239}