vortex_array/pipeline/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4pub mod driver;
5
6use vortex_error::{VortexExpect, VortexResult};
7use vortex_vector::{Vector, VectorMut};
8
9/// A view over a fixed-size `N`-bit vector used in Vortex pipeline execution.
10pub type BitView<'a> = vortex_buffer::BitView<'a, N_BYTES>;
11
12/// The number of elements in each step of a Vortex evaluation operator.
13pub const N: usize = 1024;
14
15/// Number of bytes needed to store N bits
16pub const N_BYTES: usize = N / 8;
17
18/// Number of usize words needed to store N bits
19pub const N_WORDS: usize = N / usize::BITS as usize;
20
21/// A pipeline node is a trait that enables an array to participate in pipelined execution.
22pub trait PipelinedNode {
23    /// Returns information about the children of this node and how the node should participate
24    /// in pipelined execution.
25    fn inputs(&self) -> PipelineInputs;
26
27    /// Bind the node into a [`Kernel`] for pipelined execution.
28    fn bind(&self, ctx: &dyn BindContext) -> VortexResult<Box<dyn Kernel>>;
29}
30
31/// Describes the type of pipeline node and its input information.
32pub enum PipelineInputs {
33    /// This node acts as a pipeline source.
34    ///
35    /// All array inputs will be available as pre-computed batch inputs in the [`BindContext`].
36    Source,
37
38    /// This node acts as a transform node.
39    ///
40    /// Each listed index indicates a child that should be provided as a pipelined input. Each
41    /// pipelined input should be bound to a [`VectorId`] via the [`BindContext`] and then
42    /// accessed within the kernel by passing the [`VectorId`] to the [`KernelCtx`].
43    ///
44    /// All other children will be available as pre-computed batch inputs in the [`BindContext`].
45    Transform { pipelined_inputs: Vec<usize> },
46    // TODO(ngates): we may want a Chain variant in the future to support pipelining chunked arrays
47}
48
49/// The context used when binding an operator for execution.
50pub trait BindContext {
51    /// Returns the [`VectorId`] for the given child that can be passed to the
52    /// [`KernelCtx`] within each step to access the given input.
53    ///
54    /// Note that this child index references the pipelined inputs only, not all children of the
55    /// array.
56    fn pipelined_input(&self, pipelined_child_idx: usize) -> VectorId;
57
58    /// Returns the batch input vector for the given child.
59    ///
60    /// Note that this child index references the batch inputs only, not all children of the
61    /// array.
62    fn batch_input(&mut self, batch_child_idx: usize) -> Vector;
63}
64
65/// A pipeline kernel is a stateful object that performs steps of a pipeline.
66///
67/// Each step of the kernel processes zero or more input vectors, and writes output to a
68/// pre-allocated mutable output vector.
69///
70/// Input vectors will either have length [`N`], indicating that all elements from the step are
71/// present. Or they will have length equal to the [`BitView::true_count`] of the selection mask,
72/// in which case only the selected elements are present.
73///
74/// Output vectors will always be passed with length zero.
75///
76/// Kernels may choose to output either all `N` elements in their original positions, or output
77/// only the selected elements to the first `true_count` positions of the output vector. When
78/// emitting `N` elements in-place, the kernel may omit expensive computations over the unselected
79/// elements, provided that the output elements in those positions are still valid (i.e. typically
80/// zeroed, rather than undefined).
81///
82/// The pipeline driver will verify these conditions before and after each step.
83pub trait Kernel: Send {
84    /// Perform a single step of the kernel.
85    fn step(
86        &mut self,
87        ctx: &KernelCtx,
88        selection: &BitView,
89        out: &mut VectorMut,
90    ) -> VortexResult<()>;
91}
92
93/// The context provided to kernels during execution to access input vectors.
94pub struct KernelCtx {
95    vectors: Vec<Option<VectorMut>>,
96}
97
98impl KernelCtx {
99    fn new(vectors: Vec<VectorMut>) -> Self {
100        Self {
101            vectors: vectors.into_iter().map(Some).collect(),
102        }
103    }
104
105    /// Returns the input vector at the given index.
106    ///
107    /// Note that a [`VectorMut`] is returned here, indicating that this is the only instance of
108    /// the data. It does not imply that the caller is able to mutate the data (it is returned
109    /// as an immutable reference).
110    ///
111    /// # Panics
112    ///
113    /// If the input vector at the given index is not available (typically because the vector
114    /// happens to be currently borrowed as an output vector!).
115    pub fn input(&mut self, id: VectorId) -> &VectorMut {
116        self.vectors[id.0]
117            .as_ref()
118            .vortex_expect("Input vector at index is not available")
119    }
120
121    #[inline]
122    fn take_output(&mut self, id: &VectorId) -> VectorMut {
123        self.vectors[id.0]
124            .take()
125            .vortex_expect("Output vector at index is not available")
126    }
127
128    #[inline]
129    fn replace_output(&mut self, id: &VectorId, vec: VectorMut) {
130        self.vectors[id.0] = Some(vec);
131    }
132}
133
134/// A unique identifier for a vector in the pipeline execution context.
135#[derive(Debug, Clone, Copy)]
136pub struct VectorId(usize);
137impl VectorId {
138    // Non-public constructor to keep the type opaque to end users.
139    fn new(idx: usize) -> Self {
140        VectorId(idx)
141    }
142}