Skip to main content

featherdb_query/executor/
mod.rs

1//! Query executor - executes logical plans
2
3mod aggregate;
4mod cte;
5mod ddl;
6mod dml;
7mod explain;
8mod filter;
9mod fk;
10mod join;
11mod project;
12mod scan;
13mod sort;
14mod subquery;
15mod types;
16mod window;
17
18// Re-export public types
19pub use types::{ExecutionContext, QueryResult, Row};
20
21use crate::planner::LogicalPlan;
22use featherdb_core::{Error, Permissions, Result};
23
24/// Query executor
25pub struct Executor;
26
27impl Executor {
28    /// Check if the current context has permission to perform an operation on a table
29    ///
30    /// This is the central permission check used by all scan and DML operations.
31    /// When auth is disabled, all operations are allowed.
32    /// When auth is enabled, the API key must have the required permission on the table.
33    pub(crate) fn check_table_permission(
34        ctx: &ExecutionContext,
35        table_name: &str,
36        required: Permissions,
37    ) -> Result<()> {
38        // If auth is not enabled, allow all operations
39        if !ctx.auth_enabled {
40            return Ok(());
41        }
42
43        // Get the API key (must exist if auth is enabled)
44        let api_key_id = ctx.api_key_id.ok_or(Error::AuthenticationRequired)?;
45
46        // Check permission via catalog
47        if !ctx
48            .catalog
49            .check_permission(table_name, api_key_id, required)
50        {
51            return Err(Error::PermissionDenied {
52                table: table_name.to_string(),
53                operation: required.to_string(),
54                api_key_id: api_key_id.to_string(),
55            });
56        }
57
58        Ok(())
59    }
60}
61
62impl Executor {
63    /// Execute a logical plan
64    pub fn execute(ctx: &ExecutionContext, plan: &LogicalPlan) -> Result<QueryResult> {
65        match plan {
66            LogicalPlan::Scan {
67                table,
68                alias,
69                projection,
70                filter,
71            } => Self::execute_scan(
72                ctx,
73                table,
74                alias.as_deref(),
75                projection.as_deref(),
76                filter.as_ref(),
77            ),
78
79            LogicalPlan::IndexScan {
80                table,
81                alias,
82                index,
83                index_column,
84                range,
85                residual_filter,
86                projection,
87            } => Self::execute_index_scan(
88                ctx,
89                table,
90                alias.as_deref(),
91                index,
92                *index_column,
93                range,
94                residual_filter.as_ref(),
95                projection.as_deref(),
96            ),
97
98            LogicalPlan::PkSeek {
99                table,
100                alias,
101                key_values,
102                residual_filter,
103                projection,
104            } => Self::execute_pk_seek(
105                ctx,
106                table,
107                alias.as_deref(),
108                key_values,
109                residual_filter.as_ref(),
110                projection.as_deref(),
111            ),
112
113            LogicalPlan::PkRangeScan {
114                table,
115                alias,
116                range,
117                residual_filter,
118                projection,
119            } => Self::execute_pk_range_scan(
120                ctx,
121                table,
122                alias.as_deref(),
123                range,
124                residual_filter.as_ref(),
125                projection.as_deref(),
126            ),
127
128            LogicalPlan::Filter { input, predicate } => Self::execute_filter(ctx, input, predicate),
129
130            LogicalPlan::Project { input, exprs } => Self::execute_project(ctx, input, exprs),
131
132            LogicalPlan::Join {
133                left,
134                right,
135                join_type,
136                condition,
137            } => Self::execute_join(ctx, left, right, *join_type, condition.as_ref()),
138
139            LogicalPlan::Aggregate {
140                input,
141                group_by,
142                aggregates,
143            } => Self::execute_aggregate(ctx, input, group_by, aggregates),
144
145            LogicalPlan::Sort { input, order_by } => Self::execute_sort(ctx, input, order_by),
146
147            LogicalPlan::Limit {
148                input,
149                limit,
150                offset,
151            } => Self::execute_limit(ctx, input, *limit, *offset),
152
153            LogicalPlan::Distinct { input } => Self::execute_distinct(ctx, input),
154
155            LogicalPlan::Insert {
156                table,
157                columns,
158                values,
159            } => Self::execute_insert(ctx, table, columns, values),
160
161            LogicalPlan::Update {
162                table,
163                assignments,
164                filter,
165            } => Self::execute_update(ctx, table, assignments, filter.as_ref()),
166
167            LogicalPlan::Delete { table, filter } => {
168                Self::execute_delete(ctx, table, filter.as_ref())
169            }
170
171            LogicalPlan::CreateTable { table } => Self::execute_create_table(ctx, table),
172
173            LogicalPlan::DropTable { name, if_exists } => {
174                Self::execute_drop_table(ctx, name, *if_exists)
175            }
176
177            LogicalPlan::EmptyRelation => Ok(QueryResult::Rows(vec![Row::new(vec![], vec![])])),
178
179            LogicalPlan::Explain {
180                input,
181                verbose,
182                analyze,
183            } => Self::execute_explain(ctx, input, *verbose, *analyze),
184
185            LogicalPlan::AlterTable {
186                table_name,
187                operations,
188            } => Self::execute_alter_table(ctx, table_name, operations),
189
190            LogicalPlan::Window {
191                input,
192                window_exprs,
193            } => Self::execute_window(ctx, input, window_exprs),
194
195            LogicalPlan::WithCte { ctes, query } => Self::execute_with_cte(ctx, ctes, query),
196
197            LogicalPlan::CteScan {
198                cte_name,
199                alias,
200                columns: _,
201            } => {
202                // Retrieve materialized CTE results from context
203                match ctx.cte_context.get(cte_name) {
204                    Some(rows) => {
205                        // If there's an alias, re-prefix all column names
206                        let prefix = alias.as_ref().unwrap_or(cte_name);
207                        let re_prefixed: Vec<Row> = rows
208                            .iter()
209                            .map(|row| {
210                                let new_cols: Vec<String> = row
211                                    .columns
212                                    .iter()
213                                    .map(|c| {
214                                        let bare = c.split('.').next_back().unwrap_or(c);
215                                        format!("{}.{}", prefix, bare)
216                                    })
217                                    .collect();
218                                Row::new(row.values.clone(), new_cols)
219                            })
220                            .collect();
221                        Ok(QueryResult::Rows(re_prefixed))
222                    }
223                    None => Err(Error::Internal(format!(
224                        "CTE '{}' referenced but not in scope",
225                        cte_name
226                    ))),
227                }
228            }
229
230            LogicalPlan::SubqueryScan {
231                subquery, alias, ..
232            } => Self::execute_subquery_scan(ctx, subquery, alias.as_deref()),
233
234            LogicalPlan::SemiJoin {
235                left,
236                right,
237                condition,
238                positive,
239            } => Self::execute_semi_join(ctx, left, right, condition.as_ref(), *positive),
240
241            LogicalPlan::AntiJoin {
242                left,
243                right,
244                condition,
245            } => Self::execute_anti_join(ctx, left, right, condition.as_ref()),
246
247            LogicalPlan::Grant {
248                privileges,
249                table,
250                grantee,
251            } => Self::execute_grant(ctx, *privileges, table.as_deref(), grantee),
252
253            LogicalPlan::Revoke {
254                privileges,
255                table,
256                grantee,
257            } => Self::execute_revoke(ctx, *privileges, table.as_deref(), grantee),
258
259            LogicalPlan::ShowGrants { target } => Self::execute_show_grants(ctx, target),
260
261            LogicalPlan::Union { left, right, all } => Self::execute_union(ctx, left, right, *all),
262
263            LogicalPlan::Intersect { left, right, all } => {
264                Self::execute_intersect(ctx, left, right, *all)
265            }
266
267            LogicalPlan::Except { left, right, all } => {
268                Self::execute_except(ctx, left, right, *all)
269            }
270
271            LogicalPlan::CreateView {
272                name,
273                query_sql,
274                columns,
275                ..
276            } => Self::execute_create_view(ctx, name, query_sql, columns),
277
278            LogicalPlan::DropView { name, if_exists } => {
279                Self::execute_drop_view(ctx, name, *if_exists)
280            }
281
282            LogicalPlan::CreateIndex {
283                index_name,
284                table_name,
285                columns,
286                unique,
287            } => Self::execute_create_index(ctx, index_name, table_name, columns, *unique),
288        }
289    }
290
291    /// Execute a logical plan and track execution time metrics
292    ///
293    /// This wraps the execute method to record execution time when metrics are enabled.
294    pub fn execute_with_timing(ctx: &ExecutionContext, plan: &LogicalPlan) -> Result<QueryResult> {
295        if let Some(metrics) = &ctx.metrics {
296            let start = std::time::Instant::now();
297            let result = Self::execute(ctx, plan)?;
298            let elapsed = start.elapsed().as_micros() as u64;
299
300            metrics.record_exec_time(elapsed);
301
302            // Track rows returned for SELECT queries
303            if let QueryResult::Rows(ref rows) = result {
304                metrics.record_rows_returned(rows.len() as u64);
305            }
306
307            Ok(result)
308        } else {
309            // No metrics, just execute normally
310            Self::execute(ctx, plan)
311        }
312    }
313}