vortex_array/pipeline/query/
mod.rs1mod buffers;
5mod dag;
6mod operators;
7mod query_execution;
8mod toposort;
9
10use vortex_error::VortexResult;
11
12use crate::pipeline::bits::BitView;
13use crate::pipeline::operators::Operator;
14pub use crate::pipeline::query::buffers::VectorAllocationPlan;
15use crate::pipeline::query::dag::DagNode;
16use crate::pipeline::query::query_execution::QueryExecution;
17use crate::pipeline::view::ViewMut;
18use crate::pipeline::{Kernel, KernelContext};
19
20pub struct QueryPlan<'a> {
29 dag: Vec<DagNode<'a>>,
31 execution_order: Vec<usize>,
33 allocation_plan: VectorAllocationPlan,
35}
36
37impl<'a> QueryPlan<'a> {
38 pub fn new(plan: &'a dyn Operator) -> VortexResult<Self> {
40 let (dag_root, dag) = Self::build_dag(plan)?;
42 let node_count = dag.len();
43
44 let execution_order = Self::topological_sort(&dag)?;
46
47 let allocation_plan = Self::allocate_vectors(&dag, &execution_order)?;
49
50 Ok(Self {
51 dag,
52 execution_order,
53 allocation_plan,
54 })
55 }
56
57 pub fn executable_plan(self) -> VortexResult<QueryExecution> {
58 let operators = Self::bind_operators(&self.dag, &self.allocation_plan)?;
60 let kernel_context = KernelContext::new(self.allocation_plan.vectors);
61
62 Ok(QueryExecution {
63 operators,
64 execution_schedule: self.execution_order,
65 kernel_context,
66 output_targets: self.allocation_plan.output_targets,
67 })
68 }
69}
70
71impl Kernel for QueryExecution {
73 fn seek(&mut self, chunk_idx: usize) -> VortexResult<()> {
74 self._seek(chunk_idx)
75 }
76
77 fn step(
78 &mut self,
79 ctx: &KernelContext,
80 selected: BitView,
81 out: &mut ViewMut,
82 ) -> VortexResult<()> {
83 self._step(selected, out)
84 }
85}