Skip to main content

fraiseql_core/runtime/executor/
aggregate.rs

1//! Aggregate and window query execution.
2
3use super::Executor;
4use crate::{
5    db::traits::DatabaseAdapter,
6    error::{FraiseQLError, Result},
7    runtime::suggest_similar,
8};
9
10impl<A: DatabaseAdapter> Executor<A> {
11    /// Execute an aggregate query dispatch.
12    ///
13    /// # Errors
14    ///
15    /// * [`FraiseQLError::Validation`] — the query name does not end with `_aggregate`, or the
16    ///   derived fact table is not found in the compiled schema.
17    /// * Propagates errors from [`execute_aggregate_query`](Self::execute_aggregate_query).
18    pub(super) async fn execute_aggregate_dispatch(
19        &self,
20        query_name: &str,
21        variables: Option<&serde_json::Value>,
22    ) -> Result<String> {
23        // Extract table name from query name (e.g., "sales_aggregate" -> "tf_sales")
24        let table_name =
25            query_name.strip_suffix("_aggregate").ok_or_else(|| FraiseQLError::Validation {
26                message: format!("Invalid aggregate query name: {}", query_name),
27                path:    None,
28            })?;
29
30        let fact_table_name = format!("tf_{}", table_name);
31
32        // Get fact table metadata from schema
33        let metadata = self.schema.get_fact_table(&fact_table_name).ok_or_else(|| {
34            let known: Vec<&str> = self.schema.list_fact_tables();
35            let suggestion = suggest_similar(&fact_table_name, &known);
36            let base = format!("Fact table '{}' not found in schema", fact_table_name);
37            let message = match suggestion.as_slice() {
38                [s] => format!("{base}. Did you mean '{s}'?"),
39                _ => base,
40            };
41            FraiseQLError::Validation {
42                message,
43                path: Some(format!("fact_tables.{}", fact_table_name)),
44            }
45        })?;
46
47        // Parse query variables into aggregate query JSON
48        let empty_json = serde_json::json!({});
49        let query_json = variables.unwrap_or(&empty_json);
50
51        // Execute aggregate query
52        self.execute_aggregate_query(query_json, query_name, metadata).await
53    }
54
55    /// Execute a window query dispatch.
56    ///
57    /// # Errors
58    ///
59    /// * [`FraiseQLError::Validation`] — the query name does not end with `_window`, or the derived
60    ///   fact table is not found in the compiled schema.
61    /// * Propagates errors from [`execute_window_query`](Self::execute_window_query).
62    pub(super) async fn execute_window_dispatch(
63        &self,
64        query_name: &str,
65        variables: Option<&serde_json::Value>,
66    ) -> Result<String> {
67        // Extract table name from query name (e.g., "sales_window" -> "tf_sales")
68        let table_name =
69            query_name.strip_suffix("_window").ok_or_else(|| FraiseQLError::Validation {
70                message: format!("Invalid window query name: {}", query_name),
71                path:    None,
72            })?;
73
74        let fact_table_name = format!("tf_{}", table_name);
75
76        // Get fact table metadata from schema
77        let metadata = self.schema.get_fact_table(&fact_table_name).ok_or_else(|| {
78            let known: Vec<&str> = self.schema.list_fact_tables();
79            let suggestion = suggest_similar(&fact_table_name, &known);
80            let base = format!("Fact table '{}' not found in schema", fact_table_name);
81            let message = match suggestion.as_slice() {
82                [s] => format!("{base}. Did you mean '{s}'?"),
83                _ => base,
84            };
85            FraiseQLError::Validation {
86                message,
87                path: Some(format!("fact_tables.{}", fact_table_name)),
88            }
89        })?;
90
91        // Parse query variables into window query JSON
92        let empty_json = serde_json::json!({});
93        let query_json = variables.unwrap_or(&empty_json);
94
95        // Execute window query
96        self.execute_window_query(query_json, query_name, metadata).await
97    }
98
99    /// Execute an aggregate query.
100    ///
101    /// # Arguments
102    ///
103    /// * `query_json` - JSON representation of the aggregate query
104    /// * `query_name` - GraphQL field name (e.g., "`sales_aggregate`")
105    /// * `metadata` - Fact table metadata
106    ///
107    /// # Returns
108    ///
109    /// GraphQL response as JSON string
110    ///
111    /// # Errors
112    ///
113    /// Returns error if:
114    /// - Query parsing fails
115    /// - Execution plan generation fails
116    /// - SQL generation fails
117    /// - Database execution fails
118    /// - Result projection fails
119    ///
120    /// # Example
121    ///
122    /// ```no_run
123    /// // Requires: a live database adapter and compiled fact table metadata.
124    /// // See: tests/integration/ for runnable examples.
125    /// # use serde_json::json;
126    /// let query_json = json!({
127    ///     "table": "tf_sales",
128    ///     "groupBy": { "category": true },
129    ///     "aggregates": [{"count": {}}]
130    /// });
131    /// // let result = executor.execute_aggregate_query(&query_json, "sales_aggregate", &metadata).await?;
132    /// ```
133    pub async fn execute_aggregate_query(
134        &self,
135        query_json: &serde_json::Value,
136        query_name: &str,
137        metadata: &crate::compiler::fact_table::FactTableMetadata,
138    ) -> Result<String> {
139        // 1. Parse JSON query into AggregationRequest
140        let request = super::super::AggregateQueryParser::parse(query_json, metadata)?;
141
142        // 2. Generate execution plan
143        let plan =
144            crate::compiler::aggregation::AggregationPlanner::plan(request, metadata.clone())?;
145
146        // 3. Generate parameterized SQL
147        let sql_generator =
148            super::super::AggregationSqlGenerator::new(self.adapter.database_type());
149        let parameterized = sql_generator.generate_parameterized(&plan)?;
150
151        // 4. Execute with bind parameters (eliminates escape-based injection risk)
152        let rows = self
153            .adapter
154            .execute_parameterized_aggregate(&parameterized.sql, &parameterized.params)
155            .await?;
156
157        // 5. Project results
158        let projected = super::super::AggregationProjector::project(rows, &plan)?;
159
160        // 6. Wrap in GraphQL data envelope
161        let response =
162            super::super::AggregationProjector::wrap_in_data_envelope(projected, query_name);
163
164        // 7. Serialize to JSON string
165        Ok(serde_json::to_string(&response)?)
166    }
167
168    /// Execute a window query.
169    ///
170    /// # Arguments
171    ///
172    /// * `query_json` - JSON representation of the window query
173    /// * `query_name` - GraphQL field name (e.g., "`sales_window`")
174    /// * `metadata` - Fact table metadata
175    ///
176    /// # Returns
177    ///
178    /// GraphQL response as JSON string
179    ///
180    /// # Errors
181    ///
182    /// Returns error if:
183    /// - Query parsing fails
184    /// - Execution plan generation fails
185    /// - SQL generation fails
186    /// - Database execution fails
187    /// - Result projection fails
188    ///
189    /// # Example
190    ///
191    /// ```no_run
192    /// // Requires: a live database adapter and compiled fact table metadata.
193    /// // See: tests/integration/ for runnable examples.
194    /// # use serde_json::json;
195    /// let query_json = json!({
196    ///     "table": "tf_sales",
197    ///     "select": [{"type": "measure", "name": "revenue", "alias": "revenue"}],
198    ///     "windows": [{
199    ///         "function": {"type": "row_number"},
200    ///         "alias": "rank",
201    ///         "partitionBy": [{"type": "dimension", "path": "category"}],
202    ///         "orderBy": [{"field": "revenue", "direction": "DESC"}]
203    ///     }]
204    /// });
205    /// // let result = executor.execute_window_query(&query_json, "sales_window", &metadata).await?;
206    /// ```
207    pub async fn execute_window_query(
208        &self,
209        query_json: &serde_json::Value,
210        query_name: &str,
211        metadata: &crate::compiler::fact_table::FactTableMetadata,
212    ) -> Result<String> {
213        // 1. Parse JSON query into WindowRequest
214        let request = super::super::WindowQueryParser::parse(query_json, metadata)?;
215
216        // 2. Generate execution plan (validates semantic names against metadata)
217        let plan = crate::compiler::window_functions::WindowPlanner::plan(request, metadata)?;
218
219        // 3. Generate SQL
220        let sql_generator = super::super::WindowSqlGenerator::new(self.adapter.database_type());
221        let sql = sql_generator.generate(&plan)?;
222
223        // 4. Execute SQL — bind parameters via execute_parameterized_aggregate so WHERE clause
224        //    values are passed as prepared-statement parameters, not inlined.
225        let rows = self
226            .adapter
227            .execute_parameterized_aggregate(&sql.raw_sql, &sql.parameters)
228            .await?;
229
230        // 5. Project results
231        let projected = super::super::WindowProjector::project(rows, &plan)?;
232
233        // 6. Wrap in GraphQL data envelope
234        let response = super::super::WindowProjector::wrap_in_data_envelope(projected, query_name);
235
236        // 7. Serialize to JSON string
237        Ok(serde_json::to_string(&response)?)
238    }
239}