Skip to main content

fraiseql_core/runtime/executor/
aggregate.rs

1//! Aggregate and window query execution.
2
3use super::Executor;
4use crate::{
5    db::traits::DatabaseAdapter,
6    error::{FraiseQLError, Result},
7    runtime::suggest_similar,
8};
9
10impl<A: DatabaseAdapter> Executor<A> {
11    /// Execute an aggregate query dispatch.
12    ///
13    /// # Errors
14    ///
15    /// * [`FraiseQLError::Validation`] — the query name does not end with `_aggregate`, or the
16    ///   derived fact table is not found in the compiled schema.
17    /// * Propagates errors from [`execute_aggregate_query`](Self::execute_aggregate_query).
18    pub(super) async fn execute_aggregate_dispatch(
19        &self,
20        query_name: &str,
21        variables: Option<&serde_json::Value>,
22    ) -> Result<serde_json::Value> {
23        // Extract table name from query name (e.g., "sales_aggregate" -> "tf_sales")
24        let table_name =
25            query_name.strip_suffix("_aggregate").ok_or_else(|| FraiseQLError::Validation {
26                message: format!("Invalid aggregate query name: {}", query_name),
27                path:    None,
28            })?;
29
30        let fact_table_name = format!("tf_{}", table_name);
31
32        // Get fact table metadata from schema
33        let metadata = self.schema.get_fact_table(&fact_table_name).ok_or_else(|| {
34            let known: Vec<&str> = self.schema.list_fact_tables();
35            let suggestion = suggest_similar(&fact_table_name, &known);
36            let base = format!("Fact table '{}' not found in schema", fact_table_name);
37            let message = match suggestion.as_slice() {
38                [s] => format!("{base}. Did you mean '{s}'?"),
39                _ => base,
40            };
41            FraiseQLError::Validation {
42                message,
43                path: Some(format!("fact_tables.{}", fact_table_name)),
44            }
45        })?;
46
47        // Parse query variables into aggregate query JSON
48        let empty_json = serde_json::json!({});
49        let query_json = variables.unwrap_or(&empty_json);
50
51        // Execute aggregate query
52        self.execute_aggregate_query(query_json, query_name, metadata).await
53    }
54
55    /// Execute a window query dispatch.
56    ///
57    /// # Errors
58    ///
59    /// * [`FraiseQLError::Validation`] — the query name does not end with `_window`, or the derived
60    ///   fact table is not found in the compiled schema.
61    /// * Propagates errors from [`execute_window_query`](Self::execute_window_query).
62    pub(super) async fn execute_window_dispatch(
63        &self,
64        query_name: &str,
65        variables: Option<&serde_json::Value>,
66    ) -> Result<serde_json::Value> {
67        // Extract table name from query name (e.g., "sales_window" -> "tf_sales")
68        let table_name =
69            query_name.strip_suffix("_window").ok_or_else(|| FraiseQLError::Validation {
70                message: format!("Invalid window query name: {}", query_name),
71                path:    None,
72            })?;
73
74        let fact_table_name = format!("tf_{}", table_name);
75
76        // Get fact table metadata from schema
77        let metadata = self.schema.get_fact_table(&fact_table_name).ok_or_else(|| {
78            let known: Vec<&str> = self.schema.list_fact_tables();
79            let suggestion = suggest_similar(&fact_table_name, &known);
80            let base = format!("Fact table '{}' not found in schema", fact_table_name);
81            let message = match suggestion.as_slice() {
82                [s] => format!("{base}. Did you mean '{s}'?"),
83                _ => base,
84            };
85            FraiseQLError::Validation {
86                message,
87                path: Some(format!("fact_tables.{}", fact_table_name)),
88            }
89        })?;
90
91        // Parse query variables into window query JSON
92        let empty_json = serde_json::json!({});
93        let query_json = variables.unwrap_or(&empty_json);
94
95        // Execute window query
96        self.execute_window_query(query_json, query_name, metadata).await
97    }
98
99    /// Execute an aggregate query.
100    ///
101    /// # Arguments
102    ///
103    /// * `query_json` - JSON representation of the aggregate query
104    /// * `query_name` - GraphQL field name (e.g., "`sales_aggregate`")
105    /// * `metadata` - Fact table metadata
106    ///
107    /// # Returns
108    ///
109    /// GraphQL response as JSON string
110    ///
111    /// # Errors
112    ///
113    /// Returns error if:
114    /// - Query parsing fails
115    /// - Execution plan generation fails
116    /// - SQL generation fails
117    /// - Database execution fails
118    /// - Result projection fails
119    ///
120    /// # Example
121    ///
122    /// ```no_run
123    /// // Requires: a live database adapter and compiled fact table metadata.
124    /// // See: tests/integration/ for runnable examples.
125    /// # use serde_json::json;
126    /// let query_json = json!({
127    ///     "table": "tf_sales",
128    ///     "groupBy": { "category": true },
129    ///     "aggregates": [{"count": {}}]
130    /// });
131    /// // let result = executor.execute_aggregate_query(&query_json, "sales_aggregate", &metadata).await?;
132    /// ```
133    pub async fn execute_aggregate_query(
134        &self,
135        query_json: &serde_json::Value,
136        query_name: &str,
137        metadata: &crate::compiler::fact_table::FactTableMetadata,
138    ) -> Result<serde_json::Value> {
139        // 1. Parse JSON query into AggregationRequest Build native_columns from
140        //    denormalized_filters so the parser can emit direct column references instead of JSONB
141        //    extraction for native columns.
142        let native_columns = crate::runtime::native_columns::filter_columns_to_native_map(
143            &metadata.denormalized_filters,
144        );
145        let request =
146            super::super::AggregateQueryParser::parse(query_json, metadata, &native_columns)?;
147
148        // 2. Generate execution plan
149        let plan =
150            crate::compiler::aggregation::AggregationPlanner::plan(request, metadata.clone())?;
151
152        // 3. Generate parameterized SQL
153        let sql_generator =
154            super::super::AggregationSqlGenerator::new(self.adapter.database_type());
155        let parameterized = sql_generator.generate_parameterized(&plan)?;
156
157        // 4. Execute with bind parameters (eliminates escape-based injection risk)
158        let rows = self
159            .adapter
160            .execute_parameterized_aggregate(&parameterized.sql, &parameterized.params)
161            .await?;
162
163        // 5. Project results
164        let projected = super::super::AggregationProjector::project(rows, &plan)?;
165
166        // 6. Wrap in GraphQL data envelope
167        let response =
168            super::super::AggregationProjector::wrap_in_data_envelope(projected, query_name);
169
170        // 7. Serialize to JSON string
171        Ok(response)
172    }
173
174    /// Execute a window query.
175    ///
176    /// # Arguments
177    ///
178    /// * `query_json` - JSON representation of the window query
179    /// * `query_name` - GraphQL field name (e.g., "`sales_window`")
180    /// * `metadata` - Fact table metadata
181    ///
182    /// # Returns
183    ///
184    /// GraphQL response as JSON string
185    ///
186    /// # Errors
187    ///
188    /// Returns error if:
189    /// - Query parsing fails
190    /// - Execution plan generation fails
191    /// - SQL generation fails
192    /// - Database execution fails
193    /// - Result projection fails
194    ///
195    /// # Example
196    ///
197    /// ```no_run
198    /// // Requires: a live database adapter and compiled fact table metadata.
199    /// // See: tests/integration/ for runnable examples.
200    /// # use serde_json::json;
201    /// let query_json = json!({
202    ///     "table": "tf_sales",
203    ///     "select": [{"type": "measure", "name": "revenue", "alias": "revenue"}],
204    ///     "windows": [{
205    ///         "function": {"type": "row_number"},
206    ///         "alias": "rank",
207    ///         "partitionBy": [{"type": "dimension", "path": "category"}],
208    ///         "orderBy": [{"field": "revenue", "direction": "DESC"}]
209    ///     }]
210    /// });
211    /// // let result = executor.execute_window_query(&query_json, "sales_window", &metadata).await?;
212    /// ```
213    pub async fn execute_window_query(
214        &self,
215        query_json: &serde_json::Value,
216        query_name: &str,
217        metadata: &crate::compiler::fact_table::FactTableMetadata,
218    ) -> Result<serde_json::Value> {
219        // 1. Parse JSON query into WindowRequest
220        let request = super::super::WindowQueryParser::parse(query_json, metadata)?;
221
222        // 2. Generate execution plan (validates semantic names against metadata)
223        let plan = crate::compiler::window_functions::WindowPlanner::plan(request, metadata)?;
224
225        // 3. Generate SQL
226        let sql_generator = super::super::WindowSqlGenerator::new(self.adapter.database_type());
227        let sql = sql_generator.generate(&plan)?;
228
229        // 4. Execute SQL — bind parameters via execute_parameterized_aggregate so WHERE clause
230        //    values are passed as prepared-statement parameters, not inlined.
231        let rows = self
232            .adapter
233            .execute_parameterized_aggregate(&sql.raw_sql, &sql.parameters)
234            .await?;
235
236        // 5. Project results
237        let projected = super::super::WindowProjector::project(rows, &plan)?;
238
239        // 6. Wrap in GraphQL data envelope
240        let response = super::super::WindowProjector::wrap_in_data_envelope(projected, query_name);
241
242        // 7. Serialize to JSON string
243        Ok(response)
244    }
245}