Skip to main content

vortex_fastlanes/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4#![allow(clippy::cast_possible_truncation)]
5
6pub use bitpacking::*;
7pub use delta::*;
8pub use r#for::*;
9pub use rle::*;
10use vortex_array::ToCanonical;
11use vortex_array::validity::Validity;
12use vortex_buffer::Buffer;
13use vortex_buffer::BufferMut;
14
15pub mod bit_transpose;
16mod bitpacking;
17mod delta;
18mod r#for;
19mod rle;
20
21pub(crate) const FL_CHUNK_SIZE: usize = 1024;
22
23use bitpacking::compute::is_constant::BitPackedIsConstantKernel;
24use r#for::compute::is_constant::FoRIsConstantKernel;
25use r#for::compute::is_sorted::FoRIsSortedKernel;
26use vortex_array::aggregate_fn::AggregateFnVTable;
27use vortex_array::aggregate_fn::fns::is_constant::IsConstant;
28use vortex_array::aggregate_fn::fns::is_sorted::IsSorted;
29use vortex_array::aggregate_fn::session::AggregateFnSessionExt;
30use vortex_array::session::ArraySessionExt;
31use vortex_session::VortexSession;
32
33/// Initialize fastlanes encodings in the given session.
34pub fn initialize(session: &mut VortexSession) {
35    session.arrays().register(BitPacked);
36    session.arrays().register(Delta);
37    session.arrays().register(FoR);
38    session.arrays().register(RLE);
39
40    // Register the encoding-specific aggregate kernels.
41    session.aggregate_fns().register_aggregate_kernel(
42        BitPacked::ID,
43        Some(IsConstant.id()),
44        &BitPackedIsConstantKernel,
45    );
46    session.aggregate_fns().register_aggregate_kernel(
47        FoR::ID,
48        Some(IsConstant.id()),
49        &FoRIsConstantKernel,
50    );
51    session.aggregate_fns().register_aggregate_kernel(
52        FoR::ID,
53        Some(IsSorted.id()),
54        &FoRIsSortedKernel,
55    );
56}
57
58/// Fill-forward null values in a buffer, replacing each null with the last valid value seen.
59///
60/// The fill-forward state resets to `T::default()` at every [`FL_CHUNK_SIZE`] boundary
61/// so that values from one chunk never leak into the next. This is important because
62/// both RLE and Delta encodings treat each chunk independently: a fill-forwarded value
63/// that crosses a chunk boundary can become an invalid chunk-local index (for RLE) or
64/// an incorrect delta base (for Delta).
65///
66/// Returns the original buffer if there are no nulls (i.e. the validity is
67/// `NonNullable` or `AllValid`), avoiding any allocation or copy.
68pub(crate) fn fill_forward_nulls<T: Copy + Default>(
69    values: Buffer<T>,
70    validity: &Validity,
71) -> Buffer<T> {
72    match validity {
73        Validity::NonNullable | Validity::AllValid => values,
74        Validity::AllInvalid => Buffer::zeroed(values.len()),
75        Validity::Array(validity_array) => {
76            let bit_buffer = validity_array.to_bool().to_bit_buffer();
77            let mut last_valid = T::default();
78            match values.try_into_mut() {
79                Ok(mut to_fill_mut) => {
80                    for (i, (v, is_valid)) in
81                        to_fill_mut.iter_mut().zip(bit_buffer.iter()).enumerate()
82                    {
83                        if is_valid {
84                            last_valid = *v;
85                        } else if i.is_multiple_of(FL_CHUNK_SIZE) {
86                            last_valid = T::default();
87                        } else {
88                            *v = last_valid;
89                        }
90                    }
91                    to_fill_mut.freeze()
92                }
93                Err(to_fill) => {
94                    let mut to_fill_mut = BufferMut::<T>::with_capacity(to_fill.len());
95                    for (i, (v, (out, is_valid))) in to_fill
96                        .iter()
97                        .zip(
98                            to_fill_mut
99                                .spare_capacity_mut()
100                                .iter_mut()
101                                .zip(bit_buffer.iter()),
102                        )
103                        .enumerate()
104                    {
105                        if is_valid {
106                            last_valid = *v;
107                        } else if i.is_multiple_of(FL_CHUNK_SIZE) {
108                            last_valid = T::default();
109                        }
110                        out.write(last_valid);
111                    }
112                    unsafe { to_fill_mut.set_len(to_fill.len()) };
113                    to_fill_mut.freeze()
114                }
115            }
116        }
117    }
118}
119
120#[cfg(test)]
121mod test {
122    use std::sync::LazyLock;
123
124    use vortex_array::session::ArraySessionExt;
125    use vortex_buffer::BitBufferMut;
126    use vortex_session::VortexSession;
127
128    use super::*;
129
130    pub static SESSION: LazyLock<VortexSession> = LazyLock::new(|| {
131        let session = VortexSession::empty();
132        session.arrays().register(BitPacked);
133        session.arrays().register(Delta);
134        session.arrays().register(FoR);
135        session.arrays().register(RLE);
136        session
137    });
138
139    #[test]
140    fn fill_forward_nulls_resets_at_chunk_boundary() {
141        // Build a buffer spanning two chunks where the last valid value in chunk 0
142        // is non-zero. Null positions at the start of chunk 1 must get T::default()
143        // (0), not the carry-over from chunk 0.
144        let mut values = BufferMut::zeroed(2 * FL_CHUNK_SIZE);
145        // Place a non-zero valid value near the end of chunk 0.
146        values[FL_CHUNK_SIZE - 1] = 42;
147
148        let mut validity_bits = BitBufferMut::new_unset(2 * FL_CHUNK_SIZE);
149        validity_bits.set(FL_CHUNK_SIZE - 1); // only this position is valid
150
151        let validity = Validity::from(validity_bits.freeze());
152        let result = fill_forward_nulls(values.freeze(), &validity);
153
154        // Within chunk 0, nulls before the valid element get 0 (default), and the
155        // valid element itself is 42.
156        assert_eq!(result[FL_CHUNK_SIZE - 1], 42);
157
158        // Chunk 1 has no valid elements. Every position must be T::default() (0),
159        // NOT 42 carried over from chunk 0.
160        for i in FL_CHUNK_SIZE..2 * FL_CHUNK_SIZE {
161            assert_eq!(
162                result[i], 0,
163                "position {i} should be 0, not carried from chunk 0"
164            );
165        }
166    }
167}