manifoldb_query/exec/
executor.rs

1//! Main query executor.
2//!
3//! This module provides the [`Executor`] that builds and runs
4//! operator trees from physical plans.
5
6use std::sync::Arc;
7
8use crate::plan::physical::PhysicalPlan;
9
10use super::context::ExecutionContext;
11use super::operator::{BoxedOperator, OperatorResult, OperatorState};
12use super::operators::{
13    aggregate::{HashAggregateOp, SortMergeAggregateOp},
14    filter::FilterOp,
15    graph::{GraphExpandOp, GraphPathScanOp},
16    join::{HashJoinOp, MergeJoinOp, NestedLoopJoinOp},
17    limit::LimitOp,
18    project::ProjectOp,
19    scan::{FullScanOp, IndexRangeScanOp, IndexScanOp},
20    set_ops::{SetOpOp, UnionOp},
21    sort::SortOp,
22    values::{EmptyOp, ValuesOp},
23    vector::{BruteForceSearchOp, HnswSearchOp, HybridSearchOp},
24};
25use super::result::{QueryResult, ResultSet, ResultSetBuilder};
26use super::row::{Row, Schema};
27
28/// The main query executor.
29///
30/// Builds an operator tree from a physical plan and executes it
31/// to produce results.
32pub struct Executor {
33    /// The root operator of the tree.
34    root: BoxedOperator,
35    /// Execution context.
36    ctx: ExecutionContext,
37    /// Whether the executor has been opened.
38    opened: bool,
39}
40
41impl Executor {
42    /// Creates a new executor for the given physical plan.
43    pub fn new(plan: &PhysicalPlan, ctx: ExecutionContext) -> OperatorResult<Self> {
44        let root = build_operator_tree(plan)?;
45        Ok(Self { root, ctx, opened: false })
46    }
47
48    /// Returns the output schema.
49    #[must_use]
50    pub fn schema(&self) -> Arc<Schema> {
51        self.root.schema()
52    }
53
54    /// Opens the executor and prepares it to produce rows.
55    pub fn open(&mut self) -> OperatorResult<()> {
56        if !self.opened {
57            self.root.open(&self.ctx)?;
58            self.opened = true;
59        }
60        Ok(())
61    }
62
63    /// Returns the next row, or `None` if there are no more rows.
64    pub fn next(&mut self) -> OperatorResult<Option<Row>> {
65        if !self.opened {
66            self.open()?;
67        }
68
69        // Check for cancellation
70        if self.ctx.is_cancelled() {
71            return Ok(None);
72        }
73
74        let row = self.root.next()?;
75
76        // Update stats
77        if row.is_some() {
78            self.ctx.record_rows_produced(1);
79        }
80
81        Ok(row)
82    }
83
84    /// Closes the executor and releases resources.
85    pub fn close(&mut self) -> OperatorResult<()> {
86        if self.opened {
87            self.root.close()?;
88            self.opened = false;
89        }
90        Ok(())
91    }
92
93    /// Returns the execution context.
94    #[must_use]
95    pub fn context(&self) -> &ExecutionContext {
96        &self.ctx
97    }
98
99    /// Returns the current state.
100    #[must_use]
101    pub fn state(&self) -> OperatorState {
102        self.root.state()
103    }
104
105    /// Executes the query and collects all results.
106    pub fn execute(&mut self) -> OperatorResult<QueryResult> {
107        self.open()?;
108
109        let schema = self.root.schema();
110        let mut builder = ResultSetBuilder::new(schema);
111
112        while let Some(row) = self.next()? {
113            builder.push(row);
114        }
115
116        self.close()?;
117
118        Ok(QueryResult::select(builder.build()))
119    }
120
121    /// Executes and returns just the rows as a vector.
122    pub fn collect(&mut self) -> OperatorResult<Vec<Row>> {
123        self.open()?;
124
125        let mut rows = Vec::new();
126        while let Some(row) = self.next()? {
127            rows.push(row);
128        }
129
130        self.close()?;
131        Ok(rows)
132    }
133
134    /// Counts the number of result rows without materializing them.
135    pub fn count(&mut self) -> OperatorResult<usize> {
136        self.open()?;
137
138        let mut count = 0;
139        while self.next()?.is_some() {
140            count += 1;
141        }
142
143        self.close()?;
144        Ok(count)
145    }
146
147    /// Returns the first row if any.
148    pub fn first(&mut self) -> OperatorResult<Option<Row>> {
149        self.open()?;
150        let row = self.next()?;
151        self.close()?;
152        Ok(row)
153    }
154}
155
156/// Builds an operator tree from a physical plan.
157fn build_operator_tree(plan: &PhysicalPlan) -> OperatorResult<BoxedOperator> {
158    match plan {
159        // Scan operations
160        PhysicalPlan::FullScan(node) => Ok(Box::new(FullScanOp::new((**node).clone()))),
161
162        PhysicalPlan::IndexScan(node) => Ok(Box::new(IndexScanOp::new((**node).clone()))),
163
164        PhysicalPlan::IndexRangeScan(node) => Ok(Box::new(IndexRangeScanOp::new((**node).clone()))),
165
166        PhysicalPlan::Values { rows, .. } => {
167            // Convert LogicalExpr rows to Value rows
168            // For now, use empty schema - actual evaluation would happen here
169            let schema = Arc::new(Schema::new(
170                (0..rows.first().map_or(0, |r| r.len())).map(|i| format!("col_{i}")).collect(),
171            ));
172            Ok(Box::new(ValuesOp::new(schema, Vec::new())))
173        }
174
175        PhysicalPlan::Empty { columns } => Ok(Box::new(EmptyOp::with_columns(columns.clone()))),
176
177        // Unary operators
178        PhysicalPlan::Filter { node, input } => {
179            let input_op = build_operator_tree(input)?;
180            Ok(Box::new(FilterOp::new(node.predicate.clone(), input_op)))
181        }
182
183        PhysicalPlan::Project { node, input } => {
184            let input_op = build_operator_tree(input)?;
185            Ok(Box::new(ProjectOp::new(node.exprs.clone(), input_op)))
186        }
187
188        PhysicalPlan::Sort { node, input } => {
189            let input_op = build_operator_tree(input)?;
190            Ok(Box::new(SortOp::new(node.order_by.clone(), input_op)))
191        }
192
193        PhysicalPlan::Limit { node, input } => {
194            let input_op = build_operator_tree(input)?;
195            Ok(Box::new(LimitOp::new(node.limit, node.offset, input_op)))
196        }
197
198        PhysicalPlan::HashDistinct { on_columns, input, .. } => {
199            // Implement as a hash aggregate with just grouping columns
200            let input_op = build_operator_tree(input)?;
201            let group_by = on_columns.clone().unwrap_or_default();
202            Ok(Box::new(HashAggregateOp::new(group_by, vec![], None, input_op)))
203        }
204
205        PhysicalPlan::HashAggregate { node, input } => {
206            let input_op = build_operator_tree(input)?;
207            Ok(Box::new(HashAggregateOp::new(
208                node.group_by.clone(),
209                node.aggregates.clone(),
210                node.having.clone(),
211                input_op,
212            )))
213        }
214
215        PhysicalPlan::SortMergeAggregate { node, input } => {
216            let input_op = build_operator_tree(input)?;
217            Ok(Box::new(SortMergeAggregateOp::new(
218                node.group_by.clone(),
219                node.aggregates.clone(),
220                node.having.clone(),
221                input_op,
222            )))
223        }
224
225        // Join operators
226        PhysicalPlan::NestedLoopJoin { node, left, right } => {
227            let left_op = build_operator_tree(left)?;
228            let right_op = build_operator_tree(right)?;
229            Ok(Box::new(NestedLoopJoinOp::new(
230                node.join_type,
231                node.condition.clone(),
232                left_op,
233                right_op,
234            )))
235        }
236
237        PhysicalPlan::HashJoin { node, build, probe } => {
238            let build_op = build_operator_tree(build)?;
239            let probe_op = build_operator_tree(probe)?;
240            Ok(Box::new(HashJoinOp::new(
241                node.join_type,
242                node.build_keys.clone(),
243                node.probe_keys.clone(),
244                node.filter.clone(),
245                build_op,
246                probe_op,
247            )))
248        }
249
250        PhysicalPlan::MergeJoin { node, left, right } => {
251            let left_op = build_operator_tree(left)?;
252            let right_op = build_operator_tree(right)?;
253            Ok(Box::new(MergeJoinOp::new(
254                node.join_type,
255                node.left_keys.clone(),
256                node.right_keys.clone(),
257                left_op,
258                right_op,
259            )))
260        }
261
262        // Set operations
263        PhysicalPlan::SetOp { op_type, left, right, .. } => {
264            let left_op = build_operator_tree(left)?;
265            let right_op = build_operator_tree(right)?;
266            Ok(Box::new(SetOpOp::new(*op_type, left_op, right_op)))
267        }
268
269        PhysicalPlan::Union { all, inputs, .. } => {
270            if inputs.is_empty() {
271                let schema = Arc::new(Schema::empty());
272                return Ok(Box::new(EmptyOp::new(schema)));
273            }
274
275            let input_ops: Vec<BoxedOperator> =
276                inputs.iter().map(build_operator_tree).collect::<Result<_, _>>()?;
277            Ok(Box::new(UnionOp::new(input_ops, *all)))
278        }
279
280        // Vector operations
281        PhysicalPlan::HnswSearch { node, input } => {
282            let input_op = build_operator_tree(input)?;
283            Ok(Box::new(HnswSearchOp::with_index(
284                node.index_name.clone(),
285                node.vector_column.clone(),
286                node.query_vector.clone(),
287                node.metric,
288                node.k,
289                node.ef_search,
290                node.include_distance,
291                node.distance_alias.clone(),
292                input_op,
293            )))
294        }
295
296        PhysicalPlan::BruteForceSearch { node, input } => {
297            let input_op = build_operator_tree(input)?;
298            Ok(Box::new(BruteForceSearchOp::new(
299                node.vector_column.clone(),
300                node.query_vector.clone(),
301                node.metric,
302                node.k,
303                node.include_distance,
304                node.distance_alias.clone(),
305                input_op,
306            )))
307        }
308
309        PhysicalPlan::HybridSearch { node, input } => {
310            let input_op = build_operator_tree(input)?;
311            Ok(Box::new(HybridSearchOp::new(
312                node.components.clone(),
313                node.k,
314                node.combination_method,
315                node.normalize_scores,
316                node.include_score,
317                node.score_alias.clone(),
318                input_op,
319            )))
320        }
321
322        // Graph operations
323        PhysicalPlan::GraphExpand { node, input } => {
324            let input_op = build_operator_tree(input)?;
325            Ok(Box::new(GraphExpandOp::new((**node).clone(), input_op)))
326        }
327
328        PhysicalPlan::GraphPathScan { node, input } => {
329            let input_op = build_operator_tree(input)?;
330            Ok(Box::new(GraphPathScanOp::new(
331                node.steps.clone(),
332                node.all_paths,
333                node.track_path,
334                input_op,
335            )))
336        }
337
338        // DML operations (not fully implemented)
339        PhysicalPlan::Insert { columns, .. } => {
340            Ok(Box::new(EmptyOp::with_columns(columns.clone())))
341        }
342
343        PhysicalPlan::Update { .. } => Ok(Box::new(EmptyOp::with_columns(vec![]))),
344
345        PhysicalPlan::Delete { .. } => Ok(Box::new(EmptyOp::with_columns(vec![]))),
346
347        // DDL operations are handled at a higher level, not as operators
348        PhysicalPlan::CreateTable(_)
349        | PhysicalPlan::DropTable(_)
350        | PhysicalPlan::CreateIndex(_)
351        | PhysicalPlan::DropIndex(_)
352        | PhysicalPlan::CreateCollection(_)
353        | PhysicalPlan::DropCollection(_) => Ok(Box::new(EmptyOp::with_columns(vec![]))),
354    }
355}
356
357/// Convenience function to execute a plan and get results.
358///
359/// Creates a default execution context and runs the plan to completion.
360pub fn execute_plan(plan: &PhysicalPlan) -> OperatorResult<ResultSet> {
361    let ctx = ExecutionContext::new();
362    let mut executor = Executor::new(plan, ctx)?;
363    let result = executor.execute()?;
364    result
365        .into_select()
366        .ok_or_else(|| crate::error::ParseError::Unsupported("Expected SELECT result".to_string()))
367}
368
369#[cfg(test)]
370mod tests {
371    use super::*;
372    use crate::plan::logical::LogicalExpr;
373    use crate::plan::logical::SortOrder;
374    use crate::plan::physical::{
375        FilterExecNode, FullScanNode, LimitExecNode, ProjectExecNode, SortExecNode,
376    };
377
378    fn make_scan_plan() -> PhysicalPlan {
379        PhysicalPlan::FullScan(Box::new(
380            FullScanNode::new("users").with_projection(vec!["id".to_string(), "name".to_string()]),
381        ))
382    }
383
384    #[test]
385    fn executor_empty() {
386        let plan = PhysicalPlan::Empty { columns: vec!["a".to_string()] };
387
388        let ctx = ExecutionContext::new();
389        let mut executor = Executor::new(&plan, ctx).unwrap();
390
391        assert_eq!(executor.count().unwrap(), 0);
392    }
393
394    #[test]
395    fn executor_filter() {
396        let scan = make_scan_plan();
397        let plan = PhysicalPlan::Filter {
398            node: FilterExecNode::new(LogicalExpr::column("id").gt(LogicalExpr::integer(5))),
399            input: Box::new(scan),
400        };
401
402        let ctx = ExecutionContext::new();
403        let executor = Executor::new(&plan, ctx).unwrap();
404
405        // Just verify it builds
406        assert_eq!(executor.schema().columns(), &["id", "name"]);
407    }
408
409    #[test]
410    fn executor_project() {
411        let scan = make_scan_plan();
412        let plan = PhysicalPlan::Project {
413            node: ProjectExecNode::new(vec![LogicalExpr::column("name")]),
414            input: Box::new(scan),
415        };
416
417        let ctx = ExecutionContext::new();
418        let executor = Executor::new(&plan, ctx).unwrap();
419
420        assert_eq!(executor.schema().columns(), &["name"]);
421    }
422
423    #[test]
424    fn executor_limit() {
425        let scan = make_scan_plan();
426        let plan = PhysicalPlan::Limit { node: LimitExecNode::limit(10), input: Box::new(scan) };
427
428        let ctx = ExecutionContext::new();
429        let executor = Executor::new(&plan, ctx).unwrap();
430
431        assert_eq!(executor.schema().columns(), &["id", "name"]);
432    }
433
434    #[test]
435    fn executor_sort() {
436        let scan = make_scan_plan();
437        let plan = PhysicalPlan::Sort {
438            node: SortExecNode::new(vec![SortOrder::asc(LogicalExpr::column("name"))]),
439            input: Box::new(scan),
440        };
441
442        let ctx = ExecutionContext::new();
443        let executor = Executor::new(&plan, ctx).unwrap();
444
445        assert_eq!(executor.schema().columns(), &["id", "name"]);
446    }
447
448    #[test]
449    fn executor_cancellation() {
450        let plan = make_scan_plan();
451        let ctx = ExecutionContext::new();
452        ctx.cancel();
453
454        let mut executor = Executor::new(&plan, ctx).unwrap();
455        executor.open().unwrap();
456
457        // Should return None due to cancellation
458        assert!(executor.next().unwrap().is_none());
459    }
460
461    #[test]
462    fn executor_stats() {
463        let plan = PhysicalPlan::Empty { columns: vec!["x".to_string()] };
464
465        let ctx = ExecutionContext::new();
466        let mut executor = Executor::new(&plan, ctx).unwrap();
467        executor.execute().unwrap();
468
469        assert!(executor.context().stats().elapsed().as_nanos() > 0);
470    }
471
472    #[test]
473    fn executor_union() {
474        use crate::plan::physical::Cost;
475
476        // Union of two empty sources
477        let plan = PhysicalPlan::Union {
478            all: false,
479            cost: Cost::default(),
480            inputs: vec![
481                PhysicalPlan::Empty { columns: vec!["x".to_string()] },
482                PhysicalPlan::Empty { columns: vec!["x".to_string()] },
483            ],
484        };
485
486        let ctx = ExecutionContext::new();
487        let mut executor = Executor::new(&plan, ctx).unwrap();
488        assert_eq!(executor.count().unwrap(), 0);
489    }
490
491    #[test]
492    fn executor_union_all() {
493        use crate::plan::physical::Cost;
494
495        // Union ALL of empty sources
496        let plan = PhysicalPlan::Union {
497            all: true,
498            cost: Cost::default(),
499            inputs: vec![
500                PhysicalPlan::Empty { columns: vec!["x".to_string()] },
501                PhysicalPlan::Empty { columns: vec!["x".to_string()] },
502            ],
503        };
504
505        let ctx = ExecutionContext::new();
506        let mut executor = Executor::new(&plan, ctx).unwrap();
507        assert_eq!(executor.count().unwrap(), 0);
508    }
509
510    #[test]
511    fn executor_set_op_intersect() {
512        use crate::plan::logical::SetOpType;
513        use crate::plan::physical::Cost;
514
515        let plan = PhysicalPlan::SetOp {
516            op_type: SetOpType::Intersect,
517            cost: Cost::default(),
518            left: Box::new(PhysicalPlan::Empty { columns: vec!["x".to_string()] }),
519            right: Box::new(PhysicalPlan::Empty { columns: vec!["x".to_string()] }),
520        };
521
522        let ctx = ExecutionContext::new();
523        let mut executor = Executor::new(&plan, ctx).unwrap();
524        assert_eq!(executor.count().unwrap(), 0);
525    }
526
527    #[test]
528    fn executor_set_op_except() {
529        use crate::plan::logical::SetOpType;
530        use crate::plan::physical::Cost;
531
532        let plan = PhysicalPlan::SetOp {
533            op_type: SetOpType::Except,
534            cost: Cost::default(),
535            left: Box::new(PhysicalPlan::Empty { columns: vec!["x".to_string()] }),
536            right: Box::new(PhysicalPlan::Empty { columns: vec!["x".to_string()] }),
537        };
538
539        let ctx = ExecutionContext::new();
540        let mut executor = Executor::new(&plan, ctx).unwrap();
541        assert_eq!(executor.count().unwrap(), 0);
542    }
543
544    #[test]
545    fn executor_empty_union() {
546        use crate::plan::physical::Cost;
547
548        // Empty Union (no inputs)
549        let plan = PhysicalPlan::Union { all: false, cost: Cost::default(), inputs: vec![] };
550
551        let ctx = ExecutionContext::new();
552        let mut executor = Executor::new(&plan, ctx).unwrap();
553        assert_eq!(executor.count().unwrap(), 0);
554    }
555}