vortex_array/pipeline/mod.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Vortex crate containing vectorized operator processing.
5//!
6//! This module contains experiments into pipelined data processing within Vortex.
7//!
8//! Arrays (and eventually Layouts) will be convertible into a [`Kernel`] that can then be
9//! exported into a [`ViewMut`] one chunk of [`N`] elements at a time. This allows us to keep
10//! compute largely within the L1 cache, as well as to write out canonical data into externally
11//! provided buffers.
12//!
13//! Each chunk is represented in a canonical physical form, as determined by the logical
14//! [`vortex_dtype::DType`] of the array. This provides a predicate base on which to perform
15//! compute. Unlike DuckDB and other vectorized systems, we force a single canonical representation
16//! instead of supporting multiple encodings because compute push-down is applied a priori to the
17//! logical representation.
18//!
19//! It is a work-in-progress and is not yet used in production.
20
21pub mod bits;
22pub(crate) mod operator;
23mod types;
24pub mod vec;
25pub mod view;
26
27/// The number of elements in each step of a Vortex evaluation operator.
28pub const N: usize = 1024;
29
30// Number of usize words needed to store N bits
31pub const N_WORDS: usize = N / usize::BITS as usize;
32
33use std::cell::RefCell;
34
35pub use types::*;
36use vec::VectorRef;
37use vortex_error::VortexResult;
38
39use self::vec::Vector;
40use self::view::ViewMut;
41use crate::Canonical;
42use crate::operator::Operator;
43
44pub trait PipelinedOperator: Operator {
45 // Whether this operator works by mutating its first child in-place.
46 //
47 // If `true`, the operator is invoked with the first child's input data passed via the
48 // mutable output view. The node is expected to mutate this data in-place.
49 // TODO(ngates): enable this
50 // fn in_place(&self) -> bool {
51 // false
52 // }
53
54 /// Bind the operator into a [`Kernel`] for pipelined execution.
55 fn bind(&self, ctx: &dyn BindContext) -> VortexResult<Box<dyn Kernel>>;
56
57 /// Returns the child indices of this operator that are passed to the kernel as input vectors.
58 fn vector_children(&self) -> Vec<usize>;
59
60 /// Returns the child indices of this operator that are passed to the kernel as batch inputs.
61 fn batch_children(&self) -> Vec<usize>;
62}
63
64/// The context used when binding an operator for execution.
65pub trait BindContext {
66 fn children(&self) -> &[VectorId];
67
68 fn batch_inputs(&self) -> &[BatchId];
69}
70
71/// The ID of the vector to use.
72pub type VectorId = usize;
73/// The ID of the batch input to use.
74pub type BatchId = usize;
75
76/// A operator provides a push-based way to emit a stream of canonical data.
77///
78/// By passing multiple vector computations through the same operator, we can amortize
79/// the setup costs (such as DType validation, stats short-circuiting, etc.), and to make better
80/// use of CPU caches by performing all operations while the data is hot.
81pub trait Kernel: Send {
82 /// Attempts to perform a single step of the operator, writing data to the output vector.
83 ///
84 /// The output vector is guaranteed to have space for at least `N` elements. The kernel
85 /// may write up to `N` elements to the output vector, and must update the length of the
86 /// output vector to reflect the number of elements written.
87 ///
88 /// TODO(ngates): alternatively, we allow the kernel to write sparse output vectors using a
89 /// Selection enum of Prefix(n), Masked(Mask), or All. This would allow parent kernels to
90 /// decide when to flatten the vector. The problem is it becomes ambiguous who is responsible
91 /// for compacting very sparse vectors.
92 fn step(&mut self, ctx: &KernelContext, out: &mut ViewMut) -> VortexResult<()>;
93}
94
95/// Context passed to kernels during execution, providing access to vectors.
96pub struct KernelContext {
97 /// The allocated vectors for intermediate results.
98 pub(crate) vectors: Vec<RefCell<Vector>>,
99 /// The computed batch inputs.
100 pub(crate) batch_inputs: Vec<Canonical>,
101}
102
103impl KernelContext {
104 /// Get a vector by its ID.
105 pub fn vector(&self, vector_id: VectorId) -> VectorRef<'_> {
106 VectorRef::new(self.vectors[vector_id].borrow())
107 }
108
109 /// Get a batch input by its ID.
110 pub fn batch_input(&self, batch_id: BatchId) -> &Canonical {
111 &self.batch_inputs[batch_id]
112 }
113}