vortex_array/arrays/primitive/
mod.rs

1use std::fmt::Debug;
2use std::iter;
3
4mod accessor;
5
6use arrow_buffer::BooleanBufferBuilder;
7use vortex_buffer::{Alignment, Buffer, BufferMut, ByteBuffer, ByteBufferMut};
8use vortex_dtype::{DType, NativePType, Nullability, PType, match_each_native_ptype};
9use vortex_error::{VortexResult, vortex_panic};
10
11use crate::builders::ArrayBuilder;
12use crate::stats::{ArrayStats, StatsSetRef};
13use crate::validity::Validity;
14use crate::{Array, ArrayRef, Canonical, EncodingId, EncodingRef, IntoArray, vtable};
15
16mod compute;
17mod native_value;
18mod ops;
19mod patch;
20mod serde;
21mod top_value;
22
23pub use compute::{IS_CONST_LANE_WIDTH, compute_is_constant};
24pub use native_value::NativeValue;
25
26use crate::vtable::{
27    ArrayVTable, CanonicalVTable, NotSupported, VTable, ValidityHelper,
28    ValidityVTableFromValidityHelper,
29};
30
31vtable!(Primitive);
32
33impl VTable for PrimitiveVTable {
34    type Array = PrimitiveArray;
35    type Encoding = PrimitiveEncoding;
36
37    type ArrayVTable = Self;
38    type CanonicalVTable = Self;
39    type OperationsVTable = Self;
40    type ValidityVTable = ValidityVTableFromValidityHelper;
41    type VisitorVTable = Self;
42    type ComputeVTable = NotSupported;
43    type EncodeVTable = NotSupported;
44    type SerdeVTable = Self;
45
46    fn id(_encoding: &Self::Encoding) -> EncodingId {
47        EncodingId::new_ref("vortex.primitive")
48    }
49
50    fn encoding(_array: &Self::Array) -> EncodingRef {
51        EncodingRef::new_ref(PrimitiveEncoding.as_ref())
52    }
53}
54
55#[derive(Clone, Debug)]
56pub struct PrimitiveArray {
57    dtype: DType,
58    buffer: ByteBuffer,
59    validity: Validity,
60    stats_set: ArrayStats,
61}
62
63#[derive(Clone, Debug)]
64pub struct PrimitiveEncoding;
65
66impl PrimitiveArray {
67    pub fn new<T: NativePType>(buffer: impl Into<Buffer<T>>, validity: Validity) -> Self {
68        let buffer = buffer.into();
69        if let Some(len) = validity.maybe_len() {
70            if buffer.len() != len {
71                vortex_panic!(
72                    "Buffer and validity length mismatch: buffer={}, validity={}",
73                    buffer.len(),
74                    len
75                );
76            }
77        }
78        Self {
79            dtype: DType::Primitive(T::PTYPE, validity.nullability()),
80            buffer: buffer.into_byte_buffer(),
81            validity,
82            stats_set: Default::default(),
83        }
84    }
85
86    pub fn empty<T: NativePType>(nullability: Nullability) -> Self {
87        Self::new(Buffer::<T>::empty(), nullability.into())
88    }
89
90    pub fn from_byte_buffer(buffer: ByteBuffer, ptype: PType, validity: Validity) -> Self {
91        match_each_native_ptype!(ptype, |T| {
92            Self::new::<T>(Buffer::from_byte_buffer(buffer), validity)
93        })
94    }
95
96    /// Create a PrimitiveArray from an iterator of `T`.
97    /// NOTE: we cannot impl FromIterator trait since it conflicts with `FromIterator<T>`.
98    pub fn from_option_iter<T: NativePType, I: IntoIterator<Item = Option<T>>>(iter: I) -> Self {
99        let iter = iter.into_iter();
100        let mut values = BufferMut::with_capacity(iter.size_hint().0);
101        let mut validity = BooleanBufferBuilder::new(values.capacity());
102
103        for i in iter {
104            match i {
105                None => {
106                    validity.append(false);
107                    values.push(T::default());
108                }
109                Some(e) => {
110                    validity.append(true);
111                    values.push(e);
112                }
113            }
114        }
115        Self::new(values.freeze(), Validity::from(validity.finish()))
116    }
117
118    /// Create a PrimitiveArray from a byte buffer containing only the valid elements.
119    pub fn from_values_byte_buffer(
120        valid_elems_buffer: ByteBuffer,
121        ptype: PType,
122        validity: Validity,
123        n_rows: usize,
124    ) -> VortexResult<Self> {
125        let byte_width = ptype.byte_width();
126        let alignment = Alignment::new(byte_width);
127        let buffer = match &validity {
128            Validity::AllValid | Validity::NonNullable => valid_elems_buffer.aligned(alignment),
129            Validity::AllInvalid => ByteBuffer::zeroed_aligned(n_rows * byte_width, alignment),
130            Validity::Array(is_valid) => {
131                let bool_array = is_valid.to_canonical()?.into_bool()?;
132                let bool_buffer = bool_array.boolean_buffer();
133                let mut bytes = ByteBufferMut::zeroed_aligned(n_rows * byte_width, alignment);
134                for (i, valid_i) in bool_buffer.set_indices().enumerate() {
135                    bytes[valid_i * byte_width..(valid_i + 1) * byte_width]
136                        .copy_from_slice(&valid_elems_buffer[i * byte_width..(i + 1) * byte_width])
137                }
138                bytes.freeze()
139            }
140        };
141
142        Ok(Self::from_byte_buffer(buffer, ptype, validity))
143    }
144
145    pub fn ptype(&self) -> PType {
146        self.dtype().as_ptype()
147    }
148
149    pub fn byte_buffer(&self) -> &ByteBuffer {
150        &self.buffer
151    }
152
153    pub fn into_byte_buffer(self) -> ByteBuffer {
154        self.buffer
155    }
156
157    pub fn buffer<T: NativePType>(&self) -> Buffer<T> {
158        if T::PTYPE != self.ptype() {
159            vortex_panic!(
160                "Attempted to get buffer of type {} from array of type {}",
161                T::PTYPE,
162                self.ptype()
163            )
164        }
165        Buffer::from_byte_buffer(self.byte_buffer().clone())
166    }
167
168    pub fn into_buffer<T: NativePType>(self) -> Buffer<T> {
169        if T::PTYPE != self.ptype() {
170            vortex_panic!(
171                "Attempted to get buffer of type {} from array of type {}",
172                T::PTYPE,
173                self.ptype()
174            )
175        }
176        Buffer::from_byte_buffer(self.buffer)
177    }
178
179    /// Extract a mutable buffer from the PrimitiveArray. Attempts to do this with zero-copy
180    /// if the buffer is uniquely owned, otherwise will make a copy.
181    pub fn into_buffer_mut<T: NativePType>(self) -> BufferMut<T> {
182        if T::PTYPE != self.ptype() {
183            vortex_panic!(
184                "Attempted to get buffer_mut of type {} from array of type {}",
185                T::PTYPE,
186                self.ptype()
187            )
188        }
189        self.into_buffer()
190            .try_into_mut()
191            .unwrap_or_else(|buffer| BufferMut::<T>::copy_from(&buffer))
192    }
193
194    /// Try to extract a mutable buffer from the PrimitiveArray with zero copy.
195    #[allow(clippy::panic_in_result_fn)]
196    pub fn try_into_buffer_mut<T: NativePType>(self) -> Result<BufferMut<T>, PrimitiveArray> {
197        if T::PTYPE != self.ptype() {
198            vortex_panic!(
199                "Attempted to get buffer_mut of type {} from array of type {}",
200                T::PTYPE,
201                self.ptype()
202            )
203        }
204        let validity = self.validity().clone();
205        Buffer::<T>::from_byte_buffer(self.into_byte_buffer())
206            .try_into_mut()
207            .map_err(|buffer| PrimitiveArray::new(buffer, validity))
208    }
209
210    /// Map each element in the array to a new value.
211    ///
212    /// This ignores validity and maps over all maybe-null elements.
213    ///
214    /// TODO(ngates): we could be smarter here if validity is sparse and only run the function
215    ///   over the valid elements.
216    pub fn map_each<T, R, F>(self, f: F) -> PrimitiveArray
217    where
218        T: NativePType,
219        R: NativePType,
220        F: FnMut(T) -> R,
221    {
222        let validity = self.validity().clone();
223        let buffer = match self.try_into_buffer_mut() {
224            Ok(buffer_mut) => buffer_mut.map_each(f),
225            Err(parray) => BufferMut::<R>::from_iter(parray.buffer::<T>().iter().copied().map(f)),
226        };
227        PrimitiveArray::new(buffer.freeze(), validity)
228    }
229
230    /// Map each element in the array to a new value.
231    ///
232    /// This doesn't ignore validity and maps over all maybe-null elements, with a bool true if
233    /// valid and false otherwise.
234    pub fn map_each_with_validity<T, R, F>(self, f: F) -> VortexResult<PrimitiveArray>
235    where
236        T: NativePType,
237        R: NativePType,
238        F: FnMut((T, bool)) -> R,
239    {
240        let validity = self.validity();
241
242        let buf_iter = self.buffer::<T>().into_iter();
243
244        let buffer = match &validity {
245            Validity::NonNullable | Validity::AllValid => {
246                BufferMut::<R>::from_iter(buf_iter.zip(iter::repeat(true)).map(f))
247            }
248            Validity::AllInvalid => {
249                BufferMut::<R>::from_iter(buf_iter.zip(iter::repeat(false)).map(f))
250            }
251            Validity::Array(val) => {
252                let val = val.to_canonical()?.into_bool()?;
253                BufferMut::<R>::from_iter(buf_iter.zip(val.boolean_buffer()).map(f))
254            }
255        };
256        Ok(PrimitiveArray::new(buffer.freeze(), validity.clone()))
257    }
258
259    /// Return a slice of the array's buffer.
260    ///
261    /// NOTE: these values may be nonsense if the validity buffer indicates that the value is null.
262    pub fn as_slice<T: NativePType>(&self) -> &[T] {
263        if T::PTYPE != self.ptype() {
264            vortex_panic!(
265                "Attempted to get slice of type {} from array of type {}",
266                T::PTYPE,
267                self.ptype()
268            )
269        }
270        let raw_slice = self.byte_buffer().as_ptr();
271        // SAFETY: alignment of Buffer is checked on construction
272        unsafe {
273            std::slice::from_raw_parts(raw_slice.cast(), self.byte_buffer().len() / size_of::<T>())
274        }
275    }
276
277    pub fn reinterpret_cast(&self, ptype: PType) -> Self {
278        if self.ptype() == ptype {
279            return self.clone();
280        }
281
282        assert_eq!(
283            self.ptype().byte_width(),
284            ptype.byte_width(),
285            "can't reinterpret cast between integers of two different widths"
286        );
287
288        PrimitiveArray::from_byte_buffer(self.byte_buffer().clone(), ptype, self.validity().clone())
289    }
290}
291
292impl ArrayVTable<PrimitiveVTable> for PrimitiveVTable {
293    fn len(array: &PrimitiveArray) -> usize {
294        array.byte_buffer().len() / array.ptype().byte_width()
295    }
296
297    fn dtype(array: &PrimitiveArray) -> &DType {
298        &array.dtype
299    }
300
301    fn stats(array: &PrimitiveArray) -> StatsSetRef<'_> {
302        array.stats_set.to_ref(array.as_ref())
303    }
304}
305
306impl ValidityHelper for PrimitiveArray {
307    fn validity(&self) -> &Validity {
308        &self.validity
309    }
310}
311
312impl<T: NativePType> FromIterator<T> for PrimitiveArray {
313    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
314        let values = BufferMut::from_iter(iter);
315        PrimitiveArray::new(values.freeze(), Validity::NonNullable)
316    }
317}
318
319impl<T: NativePType> IntoArray for Buffer<T> {
320    fn into_array(self) -> ArrayRef {
321        PrimitiveArray::new(self, Validity::NonNullable).into_array()
322    }
323}
324
325impl<T: NativePType> IntoArray for BufferMut<T> {
326    fn into_array(self) -> ArrayRef {
327        self.freeze().into_array()
328    }
329}
330
331impl CanonicalVTable<PrimitiveVTable> for PrimitiveVTable {
332    fn canonicalize(array: &PrimitiveArray) -> VortexResult<Canonical> {
333        Ok(Canonical::Primitive(array.clone()))
334    }
335
336    fn append_to_builder(
337        array: &PrimitiveArray,
338        builder: &mut dyn ArrayBuilder,
339    ) -> VortexResult<()> {
340        builder.extend_from_array(array.as_ref())
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    use vortex_buffer::buffer;
347    use vortex_scalar::PValue;
348
349    use crate::arrays::{BoolArray, PrimitiveArray};
350    use crate::compute::conformance::mask::test_mask;
351    use crate::compute::conformance::search_sorted::rstest_reuse::apply;
352    use crate::compute::conformance::search_sorted::{search_sorted_conformance, *};
353    use crate::search_sorted::{SearchResult, SearchSorted, SearchSortedSide};
354    use crate::validity::Validity;
355    use crate::{ArrayRef, IntoArray};
356
357    #[apply(search_sorted_conformance)]
358    fn search_sorted_primitive(
359        #[case] array: ArrayRef,
360        #[case] value: i32,
361        #[case] side: SearchSortedSide,
362        #[case] expected: SearchResult,
363    ) {
364        let res = array
365            .as_primitive_typed()
366            .search_sorted(&Some(PValue::from(value)), side);
367        assert_eq!(res, expected);
368    }
369
370    #[test]
371    fn test_mask_primitive_array() {
372        test_mask(PrimitiveArray::new(buffer![0, 1, 2, 3, 4], Validity::NonNullable).as_ref());
373        test_mask(PrimitiveArray::new(buffer![0, 1, 2, 3, 4], Validity::AllValid).as_ref());
374        test_mask(PrimitiveArray::new(buffer![0, 1, 2, 3, 4], Validity::AllInvalid).as_ref());
375        test_mask(
376            PrimitiveArray::new(
377                buffer![0, 1, 2, 3, 4],
378                Validity::Array(
379                    BoolArray::from_iter([true, false, true, false, true]).into_array(),
380                ),
381            )
382            .as_ref(),
383        );
384    }
385}