1pub mod executor;
6pub mod expr;
7mod metrics;
8mod optimizer;
9mod parser;
10mod planner;
11mod prepared;
12mod window;
13
14pub use executor::{ExecutionContext, Executor, QueryResult, Row};
15pub use expr::{
16 AggregateFunction,
17 BinaryOp,
18 Expr,
19 FrameBound,
20 FrameUnit,
21 SubqueryCompareOp,
22 SubqueryType,
23 UnaryOp,
24 WindowFrame,
25 WindowFunction,
27 WindowFunctionType,
28 WindowOrderByExpr,
29 WindowSortOrder,
30};
31pub use metrics::{QueryMetrics, QueryMetricsSnapshot};
32pub use optimizer::{cost_constants, CostEstimator, Optimizer};
33pub use parser::{
34 CustomStatement, ParseResult, Parser, ShowGrantsTarget as ParserShowGrantsTarget,
35};
36pub use planner::{
37 CommonTableExpression, IndexBound, IndexRange, JoinType, LogicalPlan, Planner, PlannerContext,
38 ShowGrantsTarget, SortOrder,
39};
40pub use prepared::{
41 count_params, count_plan_params, substitute_params, substitute_plan_params, CacheStats,
42 PlanCache, PreparedStatement,
43};
44pub use window::WindowExecutor;
45
46use featherdb_catalog::Catalog;
47use featherdb_core::Result;
48use featherdb_mvcc::{Snapshot, TransactionManager};
49use featherdb_storage::BufferPool;
50use std::sync::Arc;
51
52pub struct QueryEngine {
57 optimizer: Optimizer,
58 metrics: Option<Arc<QueryMetrics>>,
59}
60
61impl QueryEngine {
62 pub fn new() -> Self {
64 Self {
65 optimizer: Optimizer::new(),
66 metrics: None,
67 }
68 }
69
70 pub fn with_metrics() -> (Self, Arc<QueryMetrics>) {
72 let metrics = Arc::new(QueryMetrics::new());
73 let engine = Self {
74 optimizer: Optimizer::new(),
75 metrics: Some(metrics.clone()),
76 };
77 (engine, metrics)
78 }
79
80 pub fn enable_metrics(&mut self) -> Arc<QueryMetrics> {
82 let metrics = Arc::new(QueryMetrics::new());
83 self.metrics = Some(metrics.clone());
84 metrics
85 }
86
87 pub fn disable_metrics(&mut self) {
89 self.metrics = None;
90 }
91
92 pub fn metrics(&self) -> Option<&Arc<QueryMetrics>> {
94 self.metrics.as_ref()
95 }
96
97 pub fn execute_sql(
106 &self,
107 sql: &str,
108 catalog: &Catalog,
109 buffer_pool: &Arc<BufferPool>,
110 snapshot: &Snapshot,
111 txn_manager: &TransactionManager,
112 txn_id: featherdb_core::TransactionId,
113 ) -> Result<QueryResult> {
114 use std::collections::HashMap;
115
116 if let Some(metrics) = &self.metrics {
117 metrics.increment_queries();
118
119 let parse_start = std::time::Instant::now();
121 let stmt = Parser::parse_one(sql)?;
122 metrics.record_parse_time(parse_start.elapsed().as_micros() as u64);
123
124 let plan_start = std::time::Instant::now();
126 let planner = Planner::new(catalog);
127 let plan = planner.plan(&stmt)?;
128 let subquery_plans = planner.take_subquery_plans();
129 let optimized = self.optimizer.optimize(plan);
130 metrics.record_plan_time(plan_start.elapsed().as_micros() as u64);
131
132 let ctx = ExecutionContext {
134 catalog,
135 buffer_pool,
136 snapshot,
137 txn_manager,
138 txn_id,
139 cte_context: HashMap::new(),
140 subquery_plans,
141 metrics: Some(metrics),
142 api_key_id: None, auth_enabled: false, };
145
146 Executor::execute_with_timing(&ctx, &optimized)
147 } else {
148 let stmt = Parser::parse_one(sql)?;
150 let planner = Planner::new(catalog);
151 let plan = planner.plan(&stmt)?;
152 let subquery_plans = planner.take_subquery_plans();
153 let optimized = self.optimizer.optimize(plan);
154
155 let ctx = ExecutionContext {
156 catalog,
157 buffer_pool,
158 snapshot,
159 txn_manager,
160 txn_id,
161 cte_context: HashMap::new(),
162 subquery_plans,
163 metrics: None,
164 api_key_id: None, auth_enabled: false, };
167
168 Executor::execute(&ctx, &optimized)
169 }
170 }
171
172 pub fn execute_prepared(
174 &self,
175 plan: &LogicalPlan,
176 catalog: &Catalog,
177 buffer_pool: &Arc<BufferPool>,
178 snapshot: &Snapshot,
179 txn_manager: &TransactionManager,
180 txn_id: featherdb_core::TransactionId,
181 ) -> Result<QueryResult> {
182 use std::collections::HashMap;
183
184 let ctx = ExecutionContext {
185 catalog,
186 buffer_pool,
187 snapshot,
188 txn_manager,
189 txn_id,
190 cte_context: HashMap::new(),
191 subquery_plans: HashMap::new(),
192 metrics: self.metrics.as_ref(),
193 api_key_id: None, auth_enabled: false, };
196
197 if self.metrics.is_some() {
198 Executor::execute_with_timing(&ctx, plan)
199 } else {
200 Executor::execute(&ctx, plan)
201 }
202 }
203}
204
205impl Default for QueryEngine {
206 fn default() -> Self {
207 Self::new()
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214
215 #[test]
216 fn test_query_engine_creation() {
217 let engine = QueryEngine::new();
219 assert!(engine.metrics().is_none());
220
221 let (engine_with_metrics, metrics) = QueryEngine::with_metrics();
223 assert!(engine_with_metrics.metrics().is_some());
224
225 let snapshot = metrics.snapshot();
227 assert_eq!(snapshot.queries_executed, 0);
228 assert_eq!(snapshot.total_parse_time_us, 0);
229 assert_eq!(snapshot.total_plan_time_us, 0);
230 assert_eq!(snapshot.total_exec_time_us, 0);
231 assert_eq!(snapshot.rows_scanned, 0);
232 assert_eq!(snapshot.rows_returned, 0);
233 }
234
235 #[test]
236 fn test_metrics_enable_disable() {
237 let mut engine = QueryEngine::new();
238 assert!(engine.metrics().is_none());
239
240 let metrics = engine.enable_metrics();
242 assert!(engine.metrics().is_some());
243
244 let snapshot = metrics.snapshot();
246 assert_eq!(snapshot.queries_executed, 0);
247
248 engine.disable_metrics();
250 assert!(engine.metrics().is_none());
251 }
252
253 #[test]
254 fn test_default_query_engine() {
255 let engine = QueryEngine::default();
256 assert!(engine.metrics().is_none());
257 }
258}