vortex_array/pipeline/query/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4mod buffers;
5mod dag;
6mod operators;
7mod query_execution;
8mod toposort;
9
10use vortex_error::VortexResult;
11
12use crate::pipeline::bits::BitView;
13use crate::pipeline::operators::Operator;
14pub use crate::pipeline::query::buffers::VectorAllocationPlan;
15use crate::pipeline::query::dag::DagNode;
16use crate::pipeline::query::query_execution::QueryExecution;
17use crate::pipeline::view::ViewMut;
18use crate::pipeline::{Kernel, KernelContext};
19
20/// The idea of a query-plan is to orchestrate driving a set of operators to completion with
21/// fully optimized resource usage.
22///
23/// During construction, the plan is analyzed to determine the optimal way to execute the nodes.
24/// This includes:
25/// - Sub-expression elimination: Identifying common sub-expressions and reusing them.
26/// - Vector allocation: Determining how many intermediate vectors are needed.
27/// - Buffer management: Managing the buffers that hold the data for each node.
28pub struct QueryPlan<'a> {
29    /// Nodes in the DAG representing the execution plan with common sub-expressions eliminated.
30    dag: Vec<DagNode<'a>>,
31    /// The topological order of `dag` nodes for execution.
32    execution_order: Vec<usize>,
33    /// The allocation plan for vectors used by the pipeline.
34    allocation_plan: VectorAllocationPlan,
35}
36
37impl<'a> QueryPlan<'a> {
38    // TODO(ngates): can we pass the mask in here such that the plan can replace empty nodes?
39    pub fn new(plan: &'a dyn Operator) -> VortexResult<Self> {
40        // Step 1: Convert the plan tree to a DAG by eliminating common sub-expressions.
41        let (dag_root, dag) = Self::build_dag(plan)?;
42        let node_count = dag.len();
43
44        // Step 2: Determine execution order (topological sort)
45        let execution_order = Self::topological_sort(&dag)?;
46
47        // Step 3: Allocate vectors
48        let allocation_plan = Self::allocate_vectors(&dag, &execution_order)?;
49
50        Ok(Self {
51            dag,
52            execution_order,
53            allocation_plan,
54        })
55    }
56
57    pub fn executable_plan(self) -> VortexResult<QueryExecution> {
58        // Construct the operators, binding their inputs using the allocation plan.
59        let operators = Self::bind_operators(&self.dag, &self.allocation_plan)?;
60        let kernel_context = KernelContext::new(self.allocation_plan.vectors);
61
62        Ok(QueryExecution {
63            operators,
64            execution_schedule: self.execution_order,
65            kernel_context,
66            output_targets: self.allocation_plan.output_targets,
67        })
68    }
69}
70
71/// FIXME(ngates): this is a hack for testing
72impl Kernel for QueryExecution {
73    fn seek(&mut self, chunk_idx: usize) -> VortexResult<()> {
74        self._seek(chunk_idx)
75    }
76
77    fn step(
78        &mut self,
79        ctx: &KernelContext,
80        selected: BitView,
81        out: &mut ViewMut,
82    ) -> VortexResult<()> {
83        self._step(selected, out)
84    }
85}