vortex_array/pipeline/mod.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4pub mod driver;
5
6use vortex_error::{VortexExpect, VortexResult};
7use vortex_vector::{Vector, VectorMut};
8
9/// A view over a fixed-size `N`-bit vector used in Vortex pipeline execution.
10pub type BitView<'a> = vortex_buffer::BitView<'a, N_BYTES>;
11
12/// The number of elements in each step of a Vortex evaluation operator.
13pub const N: usize = 1024;
14
15/// Number of bytes needed to store N bits
16pub const N_BYTES: usize = N / 8;
17
18/// Number of usize words needed to store N bits
19pub const N_WORDS: usize = N / usize::BITS as usize;
20
21/// A pipeline node is a trait that enables an array to participate in pipelined execution.
22pub trait PipelinedNode {
23 /// Returns information about the children of this node and how the node should participate
24 /// in pipelined execution.
25 fn inputs(&self) -> PipelineInputs;
26
27 /// Bind the node into a [`Kernel`] for pipelined execution.
28 fn bind(&self, ctx: &dyn BindContext) -> VortexResult<Box<dyn Kernel>>;
29}
30
31/// Describes the type of pipeline node and its input information.
32pub enum PipelineInputs {
33 /// This node acts as a pipeline source.
34 ///
35 /// All array inputs will be available as pre-computed batch inputs in the [`BindContext`].
36 Source,
37
38 /// This node acts as a transform node.
39 ///
40 /// Each listed index indicates a child that should be provided as a pipelined input. Each
41 /// pipelined input should be bound to a [`VectorId`] via the [`BindContext`] and then
42 /// accessed within the kernel by passing the [`VectorId`] to the [`KernelCtx`].
43 ///
44 /// All other children will be available as pre-computed batch inputs in the [`BindContext`].
45 Transform { pipelined_inputs: Vec<usize> },
46 // TODO(ngates): we may want a Chain variant in the future to support pipelining chunked arrays
47}
48
49/// The context used when binding an operator for execution.
50pub trait BindContext {
51 /// Returns the [`VectorId`] for the given child that can be passed to the
52 /// [`KernelCtx`] within each step to access the given input.
53 ///
54 /// Note that this child index references the pipelined inputs only, not all children of the
55 /// array.
56 fn pipelined_input(&self, pipelined_child_idx: usize) -> VectorId;
57
58 /// Returns the batch input vector for the given child.
59 ///
60 /// Note that this child index references the batch inputs only, not all children of the
61 /// array.
62 fn batch_input(&mut self, batch_child_idx: usize) -> Vector;
63}
64
65/// A pipeline kernel is a stateful object that performs steps of a pipeline.
66///
67/// Each step of the kernel processes zero or more input vectors, and writes output to a
68/// pre-allocated mutable output vector.
69///
70/// Input vectors will either have length [`N`], indicating that all elements from the step are
71/// present. Or they will have length equal to the [`BitView::true_count`] of the selection mask,
72/// in which case only the selected elements are present.
73///
74/// Output vectors will always be passed with length zero.
75///
76/// Kernels may choose to output either all `N` elements in their original positions, or output
77/// only the selected elements to the first `true_count` positions of the output vector. When
78/// emitting `N` elements in-place, the kernel may omit expensive computations over the unselected
79/// elements, provided that the output elements in those positions are still valid (i.e. typically
80/// zeroed, rather than undefined).
81///
82/// The pipeline driver will verify these conditions before and after each step.
83pub trait Kernel: Send {
84 /// Perform a single step of the kernel.
85 fn step(
86 &mut self,
87 ctx: &KernelCtx,
88 selection: &BitView,
89 out: &mut VectorMut,
90 ) -> VortexResult<()>;
91}
92
93/// The context provided to kernels during execution to access input vectors.
94pub struct KernelCtx {
95 vectors: Vec<Option<VectorMut>>,
96}
97
98impl KernelCtx {
99 fn new(vectors: Vec<VectorMut>) -> Self {
100 Self {
101 vectors: vectors.into_iter().map(Some).collect(),
102 }
103 }
104
105 /// Returns the input vector at the given index.
106 ///
107 /// Note that a [`VectorMut`] is returned here, indicating that this is the only instance of
108 /// the data. It does not imply that the caller is able to mutate the data (it is returned
109 /// as an immutable reference).
110 ///
111 /// # Panics
112 ///
113 /// If the input vector at the given index is not available (typically because the vector
114 /// happens to be currently borrowed as an output vector!).
115 pub fn input(&mut self, id: VectorId) -> &VectorMut {
116 self.vectors[id.0]
117 .as_ref()
118 .vortex_expect("Input vector at index is not available")
119 }
120
121 #[inline]
122 fn take_output(&mut self, id: &VectorId) -> VectorMut {
123 self.vectors[id.0]
124 .take()
125 .vortex_expect("Output vector at index is not available")
126 }
127
128 #[inline]
129 fn replace_output(&mut self, id: &VectorId, vec: VectorMut) {
130 self.vectors[id.0] = Some(vec);
131 }
132}
133
134/// A unique identifier for a vector in the pipeline execution context.
135#[derive(Debug, Clone, Copy)]
136pub struct VectorId(usize);
137impl VectorId {
138 // Non-public constructor to keep the type opaque to end users.
139 fn new(idx: usize) -> Self {
140 VectorId(idx)
141 }
142}