vortex_array/pipeline/driver/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4pub mod allocation;
5mod bind;
6mod input;
7mod toposort;
8
9use std::hash::{BuildHasher, Hash, Hasher};
10
11use itertools::Itertools;
12use vortex_dtype::DType;
13use vortex_error::{VortexResult, vortex_ensure};
14use vortex_mask::Mask;
15use vortex_utils::aliases::hash_map::{HashMap, RandomState};
16use vortex_vector::{Vector, VectorMut, VectorMutOps};
17
18use crate::pipeline::driver::allocation::{OutputTarget, allocate_vectors};
19use crate::pipeline::driver::bind::bind_kernels;
20use crate::pipeline::driver::toposort::topological_sort;
21use crate::pipeline::{BitView, Kernel, KernelCtx, N, PipelineInputs};
22use crate::{Array, ArrayEq, ArrayHash, ArrayOperator, ArrayRef, ArrayVisitor, Precision};
23
24/// A pipeline driver takes a Vortex array and executes it into a canonical vector.
25///
26/// The driver builds up a DAG of pipeline nodes from the array tree up to the edges of this
27/// pipeline. The edge of a pipeline is defined as an array node that has zero pipelined children.
28/// In other words, a pipeline encompasses the execution of a single "domain" of rows, where each
29/// node has the same understanding of what a single "row" is. For example, for a DictArray the
30/// codes child is pipelined and therefore the pipeline DAG continues, but the values child is not
31/// pipelined and will be executed via a separate pipeline driver.
32///
33/// Once constructed, the pipeline driver can be executed to produce a canonical vector.
34#[derive(Clone, Debug)]
35pub(crate) struct PipelineDriver {
36    /// The pipeline stored as a DAG where all `NodeId`s index into the dag vec.
37    dag: Vec<Node>,
38    root: NodeId,
39
40    /// The set of _all_ non-pipelined children from _all_ nodes of the pipeline.
41    batch_inputs: Vec<ArrayRef>,
42}
43
44type NodeId = usize;
45type BatchId = usize;
46
47#[derive(Debug, Clone)]
48struct Node {
49    // This node's underlying array.
50    array: ArrayRef,
51    /// The type of pipeline node.
52    #[allow(dead_code)] // TODO(ngates): pipeline execute does not yet use this
53    kind: NodeKind,
54    // The indices of the pipelined children nodes in the `nodes` vector.
55    children: Vec<NodeId>,
56    // The indices of this node's parents in the `nodes` vector.
57    parents: Vec<NodeId>,
58    // The IDs of the batch inputs that feed into this node.
59    batch_inputs: Vec<BatchId>,
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63enum NodeKind {
64    /// An input node feeds a batch vector into the pipeline chunk-by-chunk.
65    Input,
66    /// A source node provides input to the pipeline by writing into mutable output vectors one
67    /// batch at a time.
68    Source,
69    /// A transform node takes pipelined inputs from its children and produces output vectors
70    Transform,
71}
72
73impl PipelineDriver {
74    /// Construct a pipeline driver from the given array.
75    ///
76    /// The constructor will traverse the array tree, walking the edges where the child is
77    /// reported to be a "pipelined" input.
78    pub fn new(array: ArrayRef) -> PipelineDriver {
79        fn visit_node(
80            array: ArrayRef,
81            dag: &mut Vec<Node>,
82            batch: &mut Vec<ArrayRef>,
83            hash_to_id: &mut HashMap<u64, NodeId>,
84            random_state: &RandomState,
85        ) -> NodeId {
86            // Compute the hash for this subtree.
87            let subtree_hash = random_state.hash_one(ArrayKey(array.clone()));
88
89            // Check if we've seen this subtree before (sub-expression elimination)
90            if let Some(&existing_index) = hash_to_id.get(&subtree_hash) {
91                // Reuse existing node
92                return existing_index;
93            }
94
95            let node = match array.as_pipelined() {
96                None => {
97                    // If the array cannot be executed as a pipeline, then it becomes a view node.
98                    let batch_id = batch.len();
99                    batch.push(array.clone());
100
101                    Node {
102                        array,
103                        kind: NodeKind::Input,
104                        children: vec![],
105                        parents: vec![],
106                        batch_inputs: vec![batch_id],
107                    }
108                }
109                Some(pipelined) => match pipelined.inputs() {
110                    PipelineInputs::Source => {
111                        // All inputs of a source node are batch inputs.
112                        let children = array.children();
113                        let mut batch_inputs = Vec::with_capacity(children.len());
114                        for child in children {
115                            batch_inputs.push(batch.len());
116                            batch.push(child);
117                        }
118
119                        Node {
120                            array,
121                            kind: NodeKind::Source,
122                            children: vec![],
123                            parents: vec![],
124                            batch_inputs,
125                        }
126                    }
127                    PipelineInputs::Transform { pipelined_inputs } => {
128                        // Only one child is the pipelined input
129                        let children = array.children();
130                        let mut batch_inputs = Vec::with_capacity(children.len());
131                        let mut pipeline_inputs = Vec::with_capacity(1);
132
133                        for (child_idx, child) in children.into_iter().enumerate() {
134                            if pipelined_inputs.contains(&child_idx) {
135                                pipeline_inputs.push(visit_node(
136                                    child.clone(),
137                                    dag,
138                                    batch,
139                                    hash_to_id,
140                                    random_state,
141                                ));
142                            } else {
143                                let batch_id = batch.len();
144                                batch.push(child);
145                                batch_inputs.push(batch_id);
146                            }
147                        }
148
149                        Node {
150                            array,
151                            kind: NodeKind::Transform,
152                            children: pipeline_inputs,
153                            parents: vec![],
154                            batch_inputs,
155                        }
156                    }
157                },
158            };
159
160            let node_id = dag.len();
161            dag.push(node);
162            hash_to_id.insert(subtree_hash, node_id);
163
164            node_id
165        }
166
167        // Build the DAG
168        let mut dag = vec![];
169        let mut batch = vec![];
170        let mut hash_to_id: HashMap<u64, NodeId> = HashMap::new();
171        let random_state = RandomState::default();
172        let root_index = visit_node(array, &mut dag, &mut batch, &mut hash_to_id, &random_state);
173
174        // Fill in parent relationships
175        for i in 0..dag.len() {
176            let children = dag[i].children.clone();
177            for &child_idx in &children {
178                dag[child_idx].parents.push(i);
179            }
180        }
181
182        PipelineDriver {
183            root: root_index,
184            dag,
185            batch_inputs: batch,
186        }
187    }
188
189    fn root_array(&self) -> &ArrayRef {
190        &self.dag[self.root].array
191    }
192
193    /// Execute the pipeline after first executing all batch inputs.
194    pub fn execute(self, selection: &Mask) -> VortexResult<Vector> {
195        let dtype = self.root_array().dtype().clone();
196
197        // Execute the batch inputs of the pipeline.
198        let batch_inputs: Vec<_> = self
199            .batch_inputs
200            .into_iter()
201            .map(|array| array.execute().map(Some))
202            .try_collect()?;
203
204        // Compute the toposort of the DAG
205        let exec_order = topological_sort(&self.dag)?;
206
207        // Compute an allocation plan for intermediate vectors
208        let allocation_plan = allocate_vectors(&self.dag, &exec_order)?;
209
210        // Bind each node in the DAG to create its kernel
211        let kernels = bind_kernels(self.dag, &allocation_plan, batch_inputs)?;
212
213        // Construct the kernel execution context
214        let ctx = KernelCtx::new(allocation_plan.vectors);
215
216        Pipeline {
217            dtype,
218            ctx,
219            kernels,
220            exec_order,
221            output_targets: allocation_plan.output_targets,
222        }
223        .execute(selection)
224    }
225}
226
227struct Pipeline {
228    dtype: DType,
229    ctx: KernelCtx,
230    kernels: Vec<Box<dyn Kernel>>,
231    exec_order: Vec<NodeId>,
232    output_targets: Vec<OutputTarget>,
233}
234
235impl Pipeline {
236    fn execute(&mut self, selection: &Mask) -> VortexResult<Vector> {
237        // Start by allocating the output vector.
238        let capacity = selection.true_count().next_multiple_of(N);
239        let mut output = VectorMut::with_capacity(&self.dtype, capacity);
240
241        match selection {
242            Mask::AllFalse(_) => {}
243            Mask::AllTrue(_) => {
244                // Run the operator to completion with all rows selected.
245                // The number of _full_ chunks we need to process.
246                let nchunks = selection.len() / N;
247                for _ in 0..nchunks {
248                    self.step(&BitView::all_true(), &mut output)?;
249                }
250
251                // Now process the final partial chunk, if any.
252                let remaining = selection.len() % N;
253                if remaining > 0 {
254                    let selection_view = BitView::with_prefix(remaining);
255                    self.step(&selection_view, &mut output)?;
256                }
257            }
258            Mask::Values(mask_values) => {
259                // Loop over each chunk of N elements in the mask as a bit view.
260                let selection_bits = mask_values.bit_buffer();
261                for selection_view in selection_bits.iter_bit_views() {
262                    self.step(&selection_view, &mut output)?;
263                }
264            }
265        }
266
267        Ok(output.freeze())
268    }
269
270    /// Perform a single step of the pipeline.
271    fn step(&mut self, selection: &BitView, output: &mut VectorMut) -> VortexResult<()> {
272        // Loop over the kernels in toposorted execution order.
273        for &node_idx in self.exec_order.iter() {
274            let kernel = &mut self.kernels[node_idx];
275
276            // Depending on the output target, either write directly to the pipeline output, or
277            // take the intermediate vector and write into that.
278            match &self.output_targets[node_idx] {
279                OutputTarget::ExternalOutput => {
280                    // We split off the next N elements of capacity from the external output vector.
281                    let mut tail = output.split_off(output.len());
282                    assert!(tail.is_empty());
283
284                    kernel.step(&self.ctx, selection, &mut tail)?;
285
286                    let len = tail.len();
287                    vortex_ensure!(
288                        len == N || len == selection.true_count(),
289                        "Kernel produced incorrect number of output elements, \
290                            expected either {N} or {}, got {len}",
291                        selection.true_count(),
292                    );
293
294                    // Since we are writing to the final vector, there are no other kernels who we
295                    // can delegate filtering the selection mask out to, so check if we need to do
296                    // a final filter before we return.
297                    if selection.true_count() < N && len == N {
298                        // tail.filter(selection_mask)
299                        todo!("Filter via a bit mask")
300                    }
301
302                    // Now we join the produced output back to the main output vector.
303                    output.unsplit(tail);
304                }
305                OutputTarget::IntermediateVector(vector_id) => {
306                    let mut out_vector = self.ctx.take_output(vector_id);
307                    out_vector.clear();
308                    debug_assert!(out_vector.is_empty());
309
310                    kernel.step(&self.ctx, selection, &mut out_vector)?;
311
312                    let len = out_vector.len();
313                    vortex_ensure!(
314                        len == N || len == selection.true_count(),
315                        "Kernel produced incorrect number of output elements, \
316                            expected either {N} or {}, got {len}",
317                        selection.true_count(),
318                    );
319
320                    // If the kernel added N elements, the output is in-place.
321                    self.ctx.replace_output(vector_id, out_vector);
322                }
323            };
324        }
325
326        Ok(())
327    }
328}
329
330/// A hashable array compared with [`Precision::Ptr`].
331struct ArrayKey(ArrayRef);
332impl Hash for ArrayKey {
333    fn hash<H: Hasher>(&self, mut state: &mut H) {
334        self.0.array_hash(&mut state, Precision::Ptr)
335    }
336}
337impl PartialEq for ArrayKey {
338    fn eq(&self, other: &Self) -> bool {
339        self.0.array_eq(&other.0, Precision::Ptr)
340    }
341}
342impl Eq for ArrayKey {}