1mod aggregate;
4mod cte;
5mod ddl;
6mod dml;
7mod explain;
8mod filter;
9mod fk;
10mod join;
11mod project;
12mod scan;
13mod sort;
14mod subquery;
15mod types;
16mod window;
17
18pub use types::{ExecutionContext, QueryResult, Row};
20
21use crate::planner::LogicalPlan;
22use featherdb_core::{Error, Permissions, Result};
23
24pub struct Executor;
26
27impl Executor {
28 pub(crate) fn check_table_permission(
34 ctx: &ExecutionContext,
35 table_name: &str,
36 required: Permissions,
37 ) -> Result<()> {
38 if !ctx.auth_enabled {
40 return Ok(());
41 }
42
43 let api_key_id = ctx.api_key_id.ok_or(Error::AuthenticationRequired)?;
45
46 if !ctx
48 .catalog
49 .check_permission(table_name, api_key_id, required)
50 {
51 return Err(Error::PermissionDenied {
52 table: table_name.to_string(),
53 operation: required.to_string(),
54 api_key_id: api_key_id.to_string(),
55 });
56 }
57
58 Ok(())
59 }
60}
61
62impl Executor {
63 pub fn execute(ctx: &ExecutionContext, plan: &LogicalPlan) -> Result<QueryResult> {
65 match plan {
66 LogicalPlan::Scan {
67 table,
68 alias,
69 projection,
70 filter,
71 } => Self::execute_scan(
72 ctx,
73 table,
74 alias.as_deref(),
75 projection.as_deref(),
76 filter.as_ref(),
77 ),
78
79 LogicalPlan::IndexScan {
80 table,
81 alias,
82 index,
83 index_column,
84 range,
85 residual_filter,
86 projection,
87 } => Self::execute_index_scan(
88 ctx,
89 table,
90 alias.as_deref(),
91 index,
92 *index_column,
93 range,
94 residual_filter.as_ref(),
95 projection.as_deref(),
96 ),
97
98 LogicalPlan::PkSeek {
99 table,
100 alias,
101 key_values,
102 residual_filter,
103 projection,
104 } => Self::execute_pk_seek(
105 ctx,
106 table,
107 alias.as_deref(),
108 key_values,
109 residual_filter.as_ref(),
110 projection.as_deref(),
111 ),
112
113 LogicalPlan::PkRangeScan {
114 table,
115 alias,
116 range,
117 residual_filter,
118 projection,
119 } => Self::execute_pk_range_scan(
120 ctx,
121 table,
122 alias.as_deref(),
123 range,
124 residual_filter.as_ref(),
125 projection.as_deref(),
126 ),
127
128 LogicalPlan::Filter { input, predicate } => Self::execute_filter(ctx, input, predicate),
129
130 LogicalPlan::Project { input, exprs } => Self::execute_project(ctx, input, exprs),
131
132 LogicalPlan::Join {
133 left,
134 right,
135 join_type,
136 condition,
137 } => Self::execute_join(ctx, left, right, *join_type, condition.as_ref()),
138
139 LogicalPlan::Aggregate {
140 input,
141 group_by,
142 aggregates,
143 } => Self::execute_aggregate(ctx, input, group_by, aggregates),
144
145 LogicalPlan::Sort { input, order_by } => Self::execute_sort(ctx, input, order_by),
146
147 LogicalPlan::Limit {
148 input,
149 limit,
150 offset,
151 } => Self::execute_limit(ctx, input, *limit, *offset),
152
153 LogicalPlan::Distinct { input } => Self::execute_distinct(ctx, input),
154
155 LogicalPlan::Insert {
156 table,
157 columns,
158 values,
159 } => Self::execute_insert(ctx, table, columns, values),
160
161 LogicalPlan::Update {
162 table,
163 assignments,
164 filter,
165 } => Self::execute_update(ctx, table, assignments, filter.as_ref()),
166
167 LogicalPlan::Delete { table, filter } => {
168 Self::execute_delete(ctx, table, filter.as_ref())
169 }
170
171 LogicalPlan::CreateTable { table } => Self::execute_create_table(ctx, table),
172
173 LogicalPlan::DropTable { name, if_exists } => {
174 Self::execute_drop_table(ctx, name, *if_exists)
175 }
176
177 LogicalPlan::EmptyRelation => Ok(QueryResult::Rows(vec![Row::new(vec![], vec![])])),
178
179 LogicalPlan::Explain {
180 input,
181 verbose,
182 analyze,
183 } => Self::execute_explain(ctx, input, *verbose, *analyze),
184
185 LogicalPlan::AlterTable {
186 table_name,
187 operations,
188 } => Self::execute_alter_table(ctx, table_name, operations),
189
190 LogicalPlan::Window {
191 input,
192 window_exprs,
193 } => Self::execute_window(ctx, input, window_exprs),
194
195 LogicalPlan::WithCte { ctes, query } => Self::execute_with_cte(ctx, ctes, query),
196
197 LogicalPlan::CteScan {
198 cte_name,
199 alias,
200 columns: _,
201 } => {
202 match ctx.cte_context.get(cte_name) {
204 Some(rows) => {
205 let prefix = alias.as_ref().unwrap_or(cte_name);
207 let re_prefixed: Vec<Row> = rows
208 .iter()
209 .map(|row| {
210 let new_cols: Vec<String> = row
211 .columns
212 .iter()
213 .map(|c| {
214 let bare = c.split('.').next_back().unwrap_or(c);
215 format!("{}.{}", prefix, bare)
216 })
217 .collect();
218 Row::new(row.values.clone(), new_cols)
219 })
220 .collect();
221 Ok(QueryResult::Rows(re_prefixed))
222 }
223 None => Err(Error::Internal(format!(
224 "CTE '{}' referenced but not in scope",
225 cte_name
226 ))),
227 }
228 }
229
230 LogicalPlan::SubqueryScan {
231 subquery, alias, ..
232 } => Self::execute_subquery_scan(ctx, subquery, alias.as_deref()),
233
234 LogicalPlan::SemiJoin {
235 left,
236 right,
237 condition,
238 positive,
239 } => Self::execute_semi_join(ctx, left, right, condition.as_ref(), *positive),
240
241 LogicalPlan::AntiJoin {
242 left,
243 right,
244 condition,
245 } => Self::execute_anti_join(ctx, left, right, condition.as_ref()),
246
247 LogicalPlan::Grant {
248 privileges,
249 table,
250 grantee,
251 } => Self::execute_grant(ctx, *privileges, table.as_deref(), grantee),
252
253 LogicalPlan::Revoke {
254 privileges,
255 table,
256 grantee,
257 } => Self::execute_revoke(ctx, *privileges, table.as_deref(), grantee),
258
259 LogicalPlan::ShowGrants { target } => Self::execute_show_grants(ctx, target),
260
261 LogicalPlan::Union { left, right, all } => Self::execute_union(ctx, left, right, *all),
262
263 LogicalPlan::Intersect { left, right, all } => {
264 Self::execute_intersect(ctx, left, right, *all)
265 }
266
267 LogicalPlan::Except { left, right, all } => {
268 Self::execute_except(ctx, left, right, *all)
269 }
270
271 LogicalPlan::CreateView {
272 name,
273 query_sql,
274 columns,
275 ..
276 } => Self::execute_create_view(ctx, name, query_sql, columns),
277
278 LogicalPlan::DropView { name, if_exists } => {
279 Self::execute_drop_view(ctx, name, *if_exists)
280 }
281
282 LogicalPlan::CreateIndex {
283 index_name,
284 table_name,
285 columns,
286 unique,
287 } => Self::execute_create_index(ctx, index_name, table_name, columns, *unique),
288 }
289 }
290
291 pub fn execute_with_timing(ctx: &ExecutionContext, plan: &LogicalPlan) -> Result<QueryResult> {
295 if let Some(metrics) = &ctx.metrics {
296 let start = std::time::Instant::now();
297 let result = Self::execute(ctx, plan)?;
298 let elapsed = start.elapsed().as_micros() as u64;
299
300 metrics.record_exec_time(elapsed);
301
302 if let QueryResult::Rows(ref rows) = result {
304 metrics.record_rows_returned(rows.len() as u64);
305 }
306
307 Ok(result)
308 } else {
309 Self::execute(ctx, plan)
311 }
312 }
313}