1#![allow(clippy::cast_possible_truncation)]
5
6pub use bitpacking::*;
7pub use delta::*;
8pub use r#for::*;
9pub use rle::*;
10use vortex_array::ToCanonical;
11use vortex_array::validity::Validity;
12use vortex_buffer::Buffer;
13use vortex_buffer::BufferMut;
14
15pub mod bit_transpose;
16mod bitpacking;
17mod delta;
18mod r#for;
19mod rle;
20
21pub(crate) const FL_CHUNK_SIZE: usize = 1024;
22
23use bitpacking::compute::is_constant::BitPackedIsConstantKernel;
24use r#for::compute::is_constant::FoRIsConstantKernel;
25use r#for::compute::is_sorted::FoRIsSortedKernel;
26use vortex_array::aggregate_fn::AggregateFnVTable;
27use vortex_array::aggregate_fn::fns::is_constant::IsConstant;
28use vortex_array::aggregate_fn::fns::is_sorted::IsSorted;
29use vortex_array::aggregate_fn::session::AggregateFnSessionExt;
30use vortex_array::session::ArraySessionExt;
31use vortex_session::VortexSession;
32
33pub fn initialize(session: &mut VortexSession) {
35 session.arrays().register(BitPacked);
36 session.arrays().register(Delta);
37 session.arrays().register(FoR);
38 session.arrays().register(RLE);
39
40 session.aggregate_fns().register_aggregate_kernel(
42 BitPacked::ID,
43 Some(IsConstant.id()),
44 &BitPackedIsConstantKernel,
45 );
46 session.aggregate_fns().register_aggregate_kernel(
47 FoR::ID,
48 Some(IsConstant.id()),
49 &FoRIsConstantKernel,
50 );
51 session.aggregate_fns().register_aggregate_kernel(
52 FoR::ID,
53 Some(IsSorted.id()),
54 &FoRIsSortedKernel,
55 );
56}
57
58pub(crate) fn fill_forward_nulls<T: Copy + Default>(
69 values: Buffer<T>,
70 validity: &Validity,
71) -> Buffer<T> {
72 match validity {
73 Validity::NonNullable | Validity::AllValid => values,
74 Validity::AllInvalid => Buffer::zeroed(values.len()),
75 Validity::Array(validity_array) => {
76 let bit_buffer = validity_array.to_bool().to_bit_buffer();
77 let mut last_valid = T::default();
78 match values.try_into_mut() {
79 Ok(mut to_fill_mut) => {
80 for (i, (v, is_valid)) in
81 to_fill_mut.iter_mut().zip(bit_buffer.iter()).enumerate()
82 {
83 if is_valid {
84 last_valid = *v;
85 } else if i.is_multiple_of(FL_CHUNK_SIZE) {
86 last_valid = T::default();
87 } else {
88 *v = last_valid;
89 }
90 }
91 to_fill_mut.freeze()
92 }
93 Err(to_fill) => {
94 let mut to_fill_mut = BufferMut::<T>::with_capacity(to_fill.len());
95 for (i, (v, (out, is_valid))) in to_fill
96 .iter()
97 .zip(
98 to_fill_mut
99 .spare_capacity_mut()
100 .iter_mut()
101 .zip(bit_buffer.iter()),
102 )
103 .enumerate()
104 {
105 if is_valid {
106 last_valid = *v;
107 } else if i.is_multiple_of(FL_CHUNK_SIZE) {
108 last_valid = T::default();
109 }
110 out.write(last_valid);
111 }
112 unsafe { to_fill_mut.set_len(to_fill.len()) };
113 to_fill_mut.freeze()
114 }
115 }
116 }
117 }
118}
119
120#[cfg(test)]
121mod test {
122 use std::sync::LazyLock;
123
124 use vortex_array::session::ArraySessionExt;
125 use vortex_buffer::BitBufferMut;
126 use vortex_session::VortexSession;
127
128 use super::*;
129
130 pub static SESSION: LazyLock<VortexSession> = LazyLock::new(|| {
131 let session = VortexSession::empty();
132 session.arrays().register(BitPacked);
133 session.arrays().register(Delta);
134 session.arrays().register(FoR);
135 session.arrays().register(RLE);
136 session
137 });
138
139 #[test]
140 fn fill_forward_nulls_resets_at_chunk_boundary() {
141 let mut values = BufferMut::zeroed(2 * FL_CHUNK_SIZE);
145 values[FL_CHUNK_SIZE - 1] = 42;
147
148 let mut validity_bits = BitBufferMut::new_unset(2 * FL_CHUNK_SIZE);
149 validity_bits.set(FL_CHUNK_SIZE - 1); let validity = Validity::from(validity_bits.freeze());
152 let result = fill_forward_nulls(values.freeze(), &validity);
153
154 assert_eq!(result[FL_CHUNK_SIZE - 1], 42);
157
158 for i in FL_CHUNK_SIZE..2 * FL_CHUNK_SIZE {
161 assert_eq!(
162 result[i], 0,
163 "position {i} should be 0, not carried from chunk 0"
164 );
165 }
166 }
167}