1use std::sync::Arc;
7
8use crate::plan::physical::PhysicalPlan;
9
10use super::context::ExecutionContext;
11use super::operator::{BoxedOperator, OperatorResult, OperatorState};
12use super::operators::{
13 aggregate::{HashAggregateOp, SortMergeAggregateOp},
14 filter::FilterOp,
15 graph::{GraphExpandOp, GraphPathScanOp},
16 join::{HashJoinOp, MergeJoinOp, NestedLoopJoinOp},
17 limit::LimitOp,
18 project::ProjectOp,
19 scan::{FullScanOp, IndexRangeScanOp, IndexScanOp},
20 set_ops::{SetOpOp, UnionOp},
21 sort::SortOp,
22 values::{EmptyOp, ValuesOp},
23 vector::{BruteForceSearchOp, HnswSearchOp, HybridSearchOp},
24};
25use super::result::{QueryResult, ResultSet, ResultSetBuilder};
26use super::row::{Row, Schema};
27
28pub struct Executor {
33 root: BoxedOperator,
35 ctx: ExecutionContext,
37 opened: bool,
39}
40
41impl Executor {
42 pub fn new(plan: &PhysicalPlan, ctx: ExecutionContext) -> OperatorResult<Self> {
44 let root = build_operator_tree(plan)?;
45 Ok(Self { root, ctx, opened: false })
46 }
47
48 #[must_use]
50 pub fn schema(&self) -> Arc<Schema> {
51 self.root.schema()
52 }
53
54 pub fn open(&mut self) -> OperatorResult<()> {
56 if !self.opened {
57 self.root.open(&self.ctx)?;
58 self.opened = true;
59 }
60 Ok(())
61 }
62
63 pub fn next(&mut self) -> OperatorResult<Option<Row>> {
65 if !self.opened {
66 self.open()?;
67 }
68
69 if self.ctx.is_cancelled() {
71 return Ok(None);
72 }
73
74 let row = self.root.next()?;
75
76 if row.is_some() {
78 self.ctx.record_rows_produced(1);
79 }
80
81 Ok(row)
82 }
83
84 pub fn close(&mut self) -> OperatorResult<()> {
86 if self.opened {
87 self.root.close()?;
88 self.opened = false;
89 }
90 Ok(())
91 }
92
93 #[must_use]
95 pub fn context(&self) -> &ExecutionContext {
96 &self.ctx
97 }
98
99 #[must_use]
101 pub fn state(&self) -> OperatorState {
102 self.root.state()
103 }
104
105 pub fn execute(&mut self) -> OperatorResult<QueryResult> {
107 self.open()?;
108
109 let schema = self.root.schema();
110 let mut builder = ResultSetBuilder::new(schema);
111
112 while let Some(row) = self.next()? {
113 builder.push(row);
114 }
115
116 self.close()?;
117
118 Ok(QueryResult::select(builder.build()))
119 }
120
121 pub fn collect(&mut self) -> OperatorResult<Vec<Row>> {
123 self.open()?;
124
125 let mut rows = Vec::new();
126 while let Some(row) = self.next()? {
127 rows.push(row);
128 }
129
130 self.close()?;
131 Ok(rows)
132 }
133
134 pub fn count(&mut self) -> OperatorResult<usize> {
136 self.open()?;
137
138 let mut count = 0;
139 while self.next()?.is_some() {
140 count += 1;
141 }
142
143 self.close()?;
144 Ok(count)
145 }
146
147 pub fn first(&mut self) -> OperatorResult<Option<Row>> {
149 self.open()?;
150 let row = self.next()?;
151 self.close()?;
152 Ok(row)
153 }
154}
155
156fn build_operator_tree(plan: &PhysicalPlan) -> OperatorResult<BoxedOperator> {
158 match plan {
159 PhysicalPlan::FullScan(node) => Ok(Box::new(FullScanOp::new((**node).clone()))),
161
162 PhysicalPlan::IndexScan(node) => Ok(Box::new(IndexScanOp::new((**node).clone()))),
163
164 PhysicalPlan::IndexRangeScan(node) => Ok(Box::new(IndexRangeScanOp::new((**node).clone()))),
165
166 PhysicalPlan::Values { rows, .. } => {
167 let schema = Arc::new(Schema::new(
170 (0..rows.first().map_or(0, |r| r.len())).map(|i| format!("col_{i}")).collect(),
171 ));
172 Ok(Box::new(ValuesOp::new(schema, Vec::new())))
173 }
174
175 PhysicalPlan::Empty { columns } => Ok(Box::new(EmptyOp::with_columns(columns.clone()))),
176
177 PhysicalPlan::Filter { node, input } => {
179 let input_op = build_operator_tree(input)?;
180 Ok(Box::new(FilterOp::new(node.predicate.clone(), input_op)))
181 }
182
183 PhysicalPlan::Project { node, input } => {
184 let input_op = build_operator_tree(input)?;
185 Ok(Box::new(ProjectOp::new(node.exprs.clone(), input_op)))
186 }
187
188 PhysicalPlan::Sort { node, input } => {
189 let input_op = build_operator_tree(input)?;
190 Ok(Box::new(SortOp::new(node.order_by.clone(), input_op)))
191 }
192
193 PhysicalPlan::Limit { node, input } => {
194 let input_op = build_operator_tree(input)?;
195 Ok(Box::new(LimitOp::new(node.limit, node.offset, input_op)))
196 }
197
198 PhysicalPlan::HashDistinct { on_columns, input, .. } => {
199 let input_op = build_operator_tree(input)?;
201 let group_by = on_columns.clone().unwrap_or_default();
202 Ok(Box::new(HashAggregateOp::new(group_by, vec![], None, input_op)))
203 }
204
205 PhysicalPlan::HashAggregate { node, input } => {
206 let input_op = build_operator_tree(input)?;
207 Ok(Box::new(HashAggregateOp::new(
208 node.group_by.clone(),
209 node.aggregates.clone(),
210 node.having.clone(),
211 input_op,
212 )))
213 }
214
215 PhysicalPlan::SortMergeAggregate { node, input } => {
216 let input_op = build_operator_tree(input)?;
217 Ok(Box::new(SortMergeAggregateOp::new(
218 node.group_by.clone(),
219 node.aggregates.clone(),
220 node.having.clone(),
221 input_op,
222 )))
223 }
224
225 PhysicalPlan::NestedLoopJoin { node, left, right } => {
227 let left_op = build_operator_tree(left)?;
228 let right_op = build_operator_tree(right)?;
229 Ok(Box::new(NestedLoopJoinOp::new(
230 node.join_type,
231 node.condition.clone(),
232 left_op,
233 right_op,
234 )))
235 }
236
237 PhysicalPlan::HashJoin { node, build, probe } => {
238 let build_op = build_operator_tree(build)?;
239 let probe_op = build_operator_tree(probe)?;
240 Ok(Box::new(HashJoinOp::new(
241 node.join_type,
242 node.build_keys.clone(),
243 node.probe_keys.clone(),
244 node.filter.clone(),
245 build_op,
246 probe_op,
247 )))
248 }
249
250 PhysicalPlan::MergeJoin { node, left, right } => {
251 let left_op = build_operator_tree(left)?;
252 let right_op = build_operator_tree(right)?;
253 Ok(Box::new(MergeJoinOp::new(
254 node.join_type,
255 node.left_keys.clone(),
256 node.right_keys.clone(),
257 left_op,
258 right_op,
259 )))
260 }
261
262 PhysicalPlan::SetOp { op_type, left, right, .. } => {
264 let left_op = build_operator_tree(left)?;
265 let right_op = build_operator_tree(right)?;
266 Ok(Box::new(SetOpOp::new(*op_type, left_op, right_op)))
267 }
268
269 PhysicalPlan::Union { all, inputs, .. } => {
270 if inputs.is_empty() {
271 let schema = Arc::new(Schema::empty());
272 return Ok(Box::new(EmptyOp::new(schema)));
273 }
274
275 let input_ops: Vec<BoxedOperator> =
276 inputs.iter().map(build_operator_tree).collect::<Result<_, _>>()?;
277 Ok(Box::new(UnionOp::new(input_ops, *all)))
278 }
279
280 PhysicalPlan::HnswSearch { node, input } => {
282 let input_op = build_operator_tree(input)?;
283 Ok(Box::new(HnswSearchOp::with_index(
284 node.index_name.clone(),
285 node.vector_column.clone(),
286 node.query_vector.clone(),
287 node.metric,
288 node.k,
289 node.ef_search,
290 node.include_distance,
291 node.distance_alias.clone(),
292 input_op,
293 )))
294 }
295
296 PhysicalPlan::BruteForceSearch { node, input } => {
297 let input_op = build_operator_tree(input)?;
298 Ok(Box::new(BruteForceSearchOp::new(
299 node.vector_column.clone(),
300 node.query_vector.clone(),
301 node.metric,
302 node.k,
303 node.include_distance,
304 node.distance_alias.clone(),
305 input_op,
306 )))
307 }
308
309 PhysicalPlan::HybridSearch { node, input } => {
310 let input_op = build_operator_tree(input)?;
311 Ok(Box::new(HybridSearchOp::new(
312 node.components.clone(),
313 node.k,
314 node.combination_method,
315 node.normalize_scores,
316 node.include_score,
317 node.score_alias.clone(),
318 input_op,
319 )))
320 }
321
322 PhysicalPlan::GraphExpand { node, input } => {
324 let input_op = build_operator_tree(input)?;
325 Ok(Box::new(GraphExpandOp::new((**node).clone(), input_op)))
326 }
327
328 PhysicalPlan::GraphPathScan { node, input } => {
329 let input_op = build_operator_tree(input)?;
330 Ok(Box::new(GraphPathScanOp::new(
331 node.steps.clone(),
332 node.all_paths,
333 node.track_path,
334 input_op,
335 )))
336 }
337
338 PhysicalPlan::Insert { columns, .. } => {
340 Ok(Box::new(EmptyOp::with_columns(columns.clone())))
341 }
342
343 PhysicalPlan::Update { .. } => Ok(Box::new(EmptyOp::with_columns(vec![]))),
344
345 PhysicalPlan::Delete { .. } => Ok(Box::new(EmptyOp::with_columns(vec![]))),
346
347 PhysicalPlan::CreateTable(_)
349 | PhysicalPlan::DropTable(_)
350 | PhysicalPlan::CreateIndex(_)
351 | PhysicalPlan::DropIndex(_)
352 | PhysicalPlan::CreateCollection(_)
353 | PhysicalPlan::DropCollection(_) => Ok(Box::new(EmptyOp::with_columns(vec![]))),
354 }
355}
356
357pub fn execute_plan(plan: &PhysicalPlan) -> OperatorResult<ResultSet> {
361 let ctx = ExecutionContext::new();
362 let mut executor = Executor::new(plan, ctx)?;
363 let result = executor.execute()?;
364 result
365 .into_select()
366 .ok_or_else(|| crate::error::ParseError::Unsupported("Expected SELECT result".to_string()))
367}
368
369#[cfg(test)]
370mod tests {
371 use super::*;
372 use crate::plan::logical::LogicalExpr;
373 use crate::plan::logical::SortOrder;
374 use crate::plan::physical::{
375 FilterExecNode, FullScanNode, LimitExecNode, ProjectExecNode, SortExecNode,
376 };
377
378 fn make_scan_plan() -> PhysicalPlan {
379 PhysicalPlan::FullScan(Box::new(
380 FullScanNode::new("users").with_projection(vec!["id".to_string(), "name".to_string()]),
381 ))
382 }
383
384 #[test]
385 fn executor_empty() {
386 let plan = PhysicalPlan::Empty { columns: vec!["a".to_string()] };
387
388 let ctx = ExecutionContext::new();
389 let mut executor = Executor::new(&plan, ctx).unwrap();
390
391 assert_eq!(executor.count().unwrap(), 0);
392 }
393
394 #[test]
395 fn executor_filter() {
396 let scan = make_scan_plan();
397 let plan = PhysicalPlan::Filter {
398 node: FilterExecNode::new(LogicalExpr::column("id").gt(LogicalExpr::integer(5))),
399 input: Box::new(scan),
400 };
401
402 let ctx = ExecutionContext::new();
403 let executor = Executor::new(&plan, ctx).unwrap();
404
405 assert_eq!(executor.schema().columns(), &["id", "name"]);
407 }
408
409 #[test]
410 fn executor_project() {
411 let scan = make_scan_plan();
412 let plan = PhysicalPlan::Project {
413 node: ProjectExecNode::new(vec![LogicalExpr::column("name")]),
414 input: Box::new(scan),
415 };
416
417 let ctx = ExecutionContext::new();
418 let executor = Executor::new(&plan, ctx).unwrap();
419
420 assert_eq!(executor.schema().columns(), &["name"]);
421 }
422
423 #[test]
424 fn executor_limit() {
425 let scan = make_scan_plan();
426 let plan = PhysicalPlan::Limit { node: LimitExecNode::limit(10), input: Box::new(scan) };
427
428 let ctx = ExecutionContext::new();
429 let executor = Executor::new(&plan, ctx).unwrap();
430
431 assert_eq!(executor.schema().columns(), &["id", "name"]);
432 }
433
434 #[test]
435 fn executor_sort() {
436 let scan = make_scan_plan();
437 let plan = PhysicalPlan::Sort {
438 node: SortExecNode::new(vec![SortOrder::asc(LogicalExpr::column("name"))]),
439 input: Box::new(scan),
440 };
441
442 let ctx = ExecutionContext::new();
443 let executor = Executor::new(&plan, ctx).unwrap();
444
445 assert_eq!(executor.schema().columns(), &["id", "name"]);
446 }
447
448 #[test]
449 fn executor_cancellation() {
450 let plan = make_scan_plan();
451 let ctx = ExecutionContext::new();
452 ctx.cancel();
453
454 let mut executor = Executor::new(&plan, ctx).unwrap();
455 executor.open().unwrap();
456
457 assert!(executor.next().unwrap().is_none());
459 }
460
461 #[test]
462 fn executor_stats() {
463 let plan = PhysicalPlan::Empty { columns: vec!["x".to_string()] };
464
465 let ctx = ExecutionContext::new();
466 let mut executor = Executor::new(&plan, ctx).unwrap();
467 executor.execute().unwrap();
468
469 assert!(executor.context().stats().elapsed().as_nanos() > 0);
470 }
471
472 #[test]
473 fn executor_union() {
474 use crate::plan::physical::Cost;
475
476 let plan = PhysicalPlan::Union {
478 all: false,
479 cost: Cost::default(),
480 inputs: vec![
481 PhysicalPlan::Empty { columns: vec!["x".to_string()] },
482 PhysicalPlan::Empty { columns: vec!["x".to_string()] },
483 ],
484 };
485
486 let ctx = ExecutionContext::new();
487 let mut executor = Executor::new(&plan, ctx).unwrap();
488 assert_eq!(executor.count().unwrap(), 0);
489 }
490
491 #[test]
492 fn executor_union_all() {
493 use crate::plan::physical::Cost;
494
495 let plan = PhysicalPlan::Union {
497 all: true,
498 cost: Cost::default(),
499 inputs: vec![
500 PhysicalPlan::Empty { columns: vec!["x".to_string()] },
501 PhysicalPlan::Empty { columns: vec!["x".to_string()] },
502 ],
503 };
504
505 let ctx = ExecutionContext::new();
506 let mut executor = Executor::new(&plan, ctx).unwrap();
507 assert_eq!(executor.count().unwrap(), 0);
508 }
509
510 #[test]
511 fn executor_set_op_intersect() {
512 use crate::plan::logical::SetOpType;
513 use crate::plan::physical::Cost;
514
515 let plan = PhysicalPlan::SetOp {
516 op_type: SetOpType::Intersect,
517 cost: Cost::default(),
518 left: Box::new(PhysicalPlan::Empty { columns: vec!["x".to_string()] }),
519 right: Box::new(PhysicalPlan::Empty { columns: vec!["x".to_string()] }),
520 };
521
522 let ctx = ExecutionContext::new();
523 let mut executor = Executor::new(&plan, ctx).unwrap();
524 assert_eq!(executor.count().unwrap(), 0);
525 }
526
527 #[test]
528 fn executor_set_op_except() {
529 use crate::plan::logical::SetOpType;
530 use crate::plan::physical::Cost;
531
532 let plan = PhysicalPlan::SetOp {
533 op_type: SetOpType::Except,
534 cost: Cost::default(),
535 left: Box::new(PhysicalPlan::Empty { columns: vec!["x".to_string()] }),
536 right: Box::new(PhysicalPlan::Empty { columns: vec!["x".to_string()] }),
537 };
538
539 let ctx = ExecutionContext::new();
540 let mut executor = Executor::new(&plan, ctx).unwrap();
541 assert_eq!(executor.count().unwrap(), 0);
542 }
543
544 #[test]
545 fn executor_empty_union() {
546 use crate::plan::physical::Cost;
547
548 let plan = PhysicalPlan::Union { all: false, cost: Cost::default(), inputs: vec![] };
550
551 let ctx = ExecutionContext::new();
552 let mut executor = Executor::new(&plan, ctx).unwrap();
553 assert_eq!(executor.count().unwrap(), 0);
554 }
555}