vortex_array/pipeline/mod.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4#![allow(unused_variables)]
5#![cfg_attr(vortex_nightly, feature(portable_simd))]
6//! Vortex crate containing vectorized pipeline processing.
7//!
8//! This module contains experiments into pipelined data processing within Vortex.
9//!
10//! Arrays (and eventually Layouts) will be convertible into a [`Kernel`] that can then be
11//! exported into a [`ViewMut`] one chunk of [`N`] elements at a time. This allows us to keep
12//! compute largely within the L1 cache, as well as to write out canonical data into externally
13//! provided buffers.
14//!
15//! Each chunk is represented in a canonical physical form, as determined by the logical
16//! [`vortex_dtype::DType`] of the array. This provides a predicate base on which to perform
17//! compute. Unlike DuckDB and other vectorized systems, we force a single canonical representation
18//! instead of supporting multiple encodings because compute push-down is applied a priori to the
19//! logical representation.
20//!
21//! It is a work-in-progress and is not yet used in production.
22
23pub mod bits;
24mod canonical;
25pub mod operators;
26pub mod query;
27mod types;
28pub mod vec;
29pub mod view;
30
31/// The number of elements in each step of a Vortex evaluation pipeline.
32pub const N: usize = 1024;
33
34// Number of usize words needed to store N bits
35pub const N_WORDS: usize = N / usize::BITS as usize;
36
37use std::cell::RefCell;
38use std::rc::Rc;
39
40pub use canonical::*;
41pub use operators::Operator;
42pub use types::*;
43use vec::{VectorId, VectorRef};
44use vortex_error::VortexResult;
45
46use self::bits::BitView;
47use self::vec::Vector;
48use self::view::ViewMut;
49
50/// A pipeline provides a push-based way to emit a stream of canonical data.
51///
52/// By passing multiple vector computations through the same pipeline, we can amortize
53/// the setup costs (such as DType validation, stats short-circuiting, etc.), and to make better
54/// use of CPU caches by performing all operations while the data is hot.
55///
56/// By passing a mask into the `step` function, we give encodings visibility into the data that
57/// will be read by their parents. Some encodings may choose to decode all `N` elements, and then
58/// set the given selection mask on the output vector. Other encodings may choose to only unpack
59/// the selected elements.
60///
61/// We are considering further adding a `defined` parameter that indicates which elements are
62/// defined and will be interpreted by the parent. This differs from masking, in that undefined
63/// elements should still live in the correct location, it just doesn't matter what their value
64/// is. This will allow, e.g. a validity encoding to tell its children that the values in certain
65/// positions are going to be masked out anyway, so don't bother doing any expensive compute.
66pub trait Kernel {
67 /// Seek the kernel to a specific chunk offset.
68 ///
69 /// Note this will be called on all kernels in a pipeline.
70 ///
71 /// i.e. the resulting row offset is `idx * N`, where `N` is the number of elements in a chunk.
72 ///
73 /// The reason for a separate seek function (vs passing an offset directly to `step`) is that
74 /// it allows the pipeline to optimize for sequential access patterns, which is common in
75 /// many encodings. For example, a run-length encoding can efficiently seek to the start of a
76 /// chunk without needing to perform a full binary search of the ends in each step.
77 // TODO(ngates): should this be `skip(n)` instead? Depends if we want to support going
78 // backwards?
79 fn seek(&mut self, chunk_idx: usize) -> VortexResult<()> {
80 Ok(())
81 }
82
83 /// Attempts to perform a single step of the pipeline, writing data to the output vector.
84 /// Returns `Poll::Done` if the pipeline is complete, or `Poll::Pending` if buffers are
85 /// required to continue.
86 ///
87 /// The `selected` parameter defines which elements of the chunk should be exported, where
88 /// `None` indicates that all elements are selected.
89 ///
90 // TODO(ngates): we could introduce a `defined` parameter to indicate which elements are
91 // defined and will be interpreted by the parent. This would allow us to skip writing
92 // elements that are not defined, for example if the parent is a dense null validity encoding.
93 fn step(
94 &mut self,
95 ctx: &KernelContext,
96 selected: BitView,
97 out: &mut ViewMut,
98 ) -> VortexResult<()>;
99}
100
101/// Context passed to kernels during execution, providing access to vectors.
102#[derive(Default)]
103pub struct KernelContext {
104 /// Optional allocation plan for resolving vector IDs
105 pub(crate) vectors: Vec<RefCell<Vector>>,
106}
107
108impl KernelContext {
109 pub fn new(allocation_plan: Vec<RefCell<Vector>>) -> Self {
110 Self {
111 vectors: allocation_plan,
112 }
113 }
114
115 /// Get a vector by its ID.
116 pub fn vector(&self, vector_id: VectorId) -> VectorRef<'_> {
117 VectorRef::new(self.vectors[*vector_id].borrow())
118 }
119}
120
121use crate::vtable::{NotSupported, VTable};
122
123pub trait PipelineVTable<V: VTable> {
124 /// Convert the current array into a [`Operator`].
125 /// Returns `None` if the array cannot be converted to an operator.
126 fn to_operator(array: &V::Array) -> VortexResult<Option<Rc<dyn Operator>>>;
127}
128
129impl<V: VTable> PipelineVTable<V> for NotSupported {
130 fn to_operator(_array: &V::Array) -> VortexResult<Option<Rc<dyn Operator>>> {
131 Ok(None)
132 }
133}