vortex_array/pipeline/mod.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4#![allow(unused_variables)]
5#![cfg_attr(vortex_nightly, feature(portable_simd))]
6//! Vortex crate containing vectorized pipeline processing.
7//!
8//! This module contains experiments into pipelined data processing within Vortex.
9//!
10//! Arrays (and eventually Layouts) will be convertible into a [`Kernel`] that can then be
11//! exported into a [`ViewMut`] one chunk of [`N`] elements at a time. This allows us to keep
12//! compute largely within the L1 cache, as well as to write out canonical data into externally
13//! provided buffers.
14//!
15//! Each chunk is represented in a canonical physical form, as determined by the logical
16//! [`vortex_dtype::DType`] of the array. This provides a predicate base on which to perform
17//! compute. Unlike DuckDB and other vectorized systems, we force a single canonical representation
18//! instead of supporting multiple encodings because compute push-down is applied a priori to the
19//! logical representation.
20//!
21//! It is a work-in-progress and is not yet used in production.
22
23pub mod bits;
24mod canonical;
25pub mod operators;
26pub mod query;
27mod types;
28pub mod vec;
29pub mod view;
30
31/// The number of elements in each step of a Vortex evaluation pipeline.
32pub const N: usize = 1024;
33
34// Number of usize words needed to store N bits
35pub const N_WORDS: usize = N / usize::BITS as usize;
36
37use std::cell::RefCell;
38
39pub use canonical::*;
40pub use operators::{Operator, OperatorRef};
41pub use types::*;
42use vec::{VectorId, VectorRef};
43use vortex_error::VortexResult;
44
45use self::bits::BitView;
46use self::vec::Vector;
47use self::view::ViewMut;
48
49/// A pipeline provides a push-based way to emit a stream of canonical data.
50///
51/// By passing multiple vector computations through the same pipeline, we can amortize
52/// the setup costs (such as DType validation, stats short-circuiting, etc.), and to make better
53/// use of CPU caches by performing all operations while the data is hot.
54///
55/// By passing a mask into the `step` function, we give encodings visibility into the data that
56/// will be read by their parents. Some encodings may choose to decode all `N` elements, and then
57/// set the given selection mask on the output vector. Other encodings may choose to only unpack
58/// the selected elements.
59///
60/// We are considering further adding a `defined` parameter that indicates which elements are
61/// defined and will be interpreted by the parent. This differs from masking, in that undefined
62/// elements should still live in the correct location, it just doesn't matter what their value
63/// is. This will allow, e.g. a validity encoding to tell its children that the values in certain
64/// positions are going to be masked out anyway, so don't bother doing any expensive compute.
65pub trait Kernel {
66 /// Seek the kernel to a specific chunk offset.
67 ///
68 /// Note this will be called on all kernels in a pipeline.
69 ///
70 /// i.e. the resulting row offset is `idx * N`, where `N` is the number of elements in a chunk.
71 ///
72 /// The reason for a separate seek function (vs passing an offset directly to `step`) is that
73 /// it allows the pipeline to optimize for sequential access patterns, which is common in
74 /// many encodings. For example, a run-length encoding can efficiently seek to the start of a
75 /// chunk without needing to perform a full binary search of the ends in each step.
76 // TODO(ngates): should this be `skip(n)` instead? Depends if we want to support going
77 // backwards?
78 fn seek(&mut self, chunk_idx: usize) -> VortexResult<()> {
79 Ok(())
80 }
81
82 /// Attempts to perform a single step of the pipeline, writing data to the output vector.
83 /// Returns `Poll::Done` if the pipeline is complete, or `Poll::Pending` if buffers are
84 /// required to continue.
85 ///
86 /// The `selected` parameter defines which elements of the chunk should be exported, where
87 /// `None` indicates that all elements are selected.
88 ///
89 // TODO(ngates): we could introduce a `defined` parameter to indicate which elements are
90 // defined and will be interpreted by the parent. This would allow us to skip writing
91 // elements that are not defined, for example if the parent is a dense null validity encoding.
92 fn step(
93 &mut self,
94 ctx: &KernelContext,
95 selected: BitView,
96 out: &mut ViewMut,
97 ) -> VortexResult<()>;
98}
99
100/// Context passed to kernels during execution, providing access to vectors.
101#[derive(Default)]
102pub struct KernelContext {
103 /// Optional allocation plan for resolving vector IDs
104 pub(crate) vectors: Vec<RefCell<Vector>>,
105}
106
107impl KernelContext {
108 pub fn new(allocation_plan: Vec<RefCell<Vector>>) -> Self {
109 Self {
110 vectors: allocation_plan,
111 }
112 }
113
114 /// Get a vector by its ID.
115 pub fn vector(&self, vector_id: VectorId) -> VectorRef<'_> {
116 VectorRef::new(self.vectors[*vector_id].borrow())
117 }
118}
119
120use crate::vtable::{NotSupported, VTable};
121
122pub trait PipelineVTable<V: VTable> {
123 /// Convert the current array into a [`Operator`].
124 /// Returns `None` if the array cannot be converted to an operator.
125 fn to_operator(array: &V::Array) -> VortexResult<Option<OperatorRef>>;
126}
127
128impl<V: VTable> PipelineVTable<V> for NotSupported {
129 fn to_operator(_array: &V::Array) -> VortexResult<Option<OperatorRef>> {
130 Ok(None)
131 }
132}