vortex_array/arrays/primitive/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Debug;
5use std::iter;
6
7mod accessor;
8
9use arrow_buffer::BooleanBufferBuilder;
10use vortex_buffer::{Alignment, Buffer, BufferMut, ByteBuffer, ByteBufferMut};
11use vortex_dtype::{DType, NativePType, Nullability, PType, match_each_native_ptype};
12use vortex_error::{VortexResult, vortex_panic};
13
14use crate::builders::ArrayBuilder;
15use crate::stats::{ArrayStats, StatsSetRef};
16use crate::validity::Validity;
17use crate::{Array, ArrayRef, Canonical, EncodingId, EncodingRef, IntoArray, vtable};
18
19mod compute;
20mod native_value;
21mod ops;
22mod patch;
23mod serde;
24mod top_value;
25
26pub use compute::{IS_CONST_LANE_WIDTH, compute_is_constant};
27pub use native_value::NativeValue;
28
29use crate::vtable::{
30    ArrayVTable, CanonicalVTable, NotSupported, VTable, ValidityHelper,
31    ValidityVTableFromValidityHelper,
32};
33
34vtable!(Primitive);
35
36impl VTable for PrimitiveVTable {
37    type Array = PrimitiveArray;
38    type Encoding = PrimitiveEncoding;
39
40    type ArrayVTable = Self;
41    type CanonicalVTable = Self;
42    type OperationsVTable = Self;
43    type ValidityVTable = ValidityVTableFromValidityHelper;
44    type VisitorVTable = Self;
45    type ComputeVTable = NotSupported;
46    type EncodeVTable = NotSupported;
47    type SerdeVTable = Self;
48
49    fn id(_encoding: &Self::Encoding) -> EncodingId {
50        EncodingId::new_ref("vortex.primitive")
51    }
52
53    fn encoding(_array: &Self::Array) -> EncodingRef {
54        EncodingRef::new_ref(PrimitiveEncoding.as_ref())
55    }
56}
57
58#[derive(Clone, Debug)]
59pub struct PrimitiveArray {
60    dtype: DType,
61    buffer: ByteBuffer,
62    validity: Validity,
63    stats_set: ArrayStats,
64}
65
66#[derive(Clone, Debug)]
67pub struct PrimitiveEncoding;
68
69impl PrimitiveArray {
70    pub fn new<T: NativePType>(buffer: impl Into<Buffer<T>>, validity: Validity) -> Self {
71        let buffer = buffer.into();
72        if let Some(len) = validity.maybe_len() {
73            if buffer.len() != len {
74                vortex_panic!(
75                    "Buffer and validity length mismatch: buffer={}, validity={}",
76                    buffer.len(),
77                    len
78                );
79            }
80        }
81        Self {
82            dtype: DType::Primitive(T::PTYPE, validity.nullability()),
83            buffer: buffer.into_byte_buffer(),
84            validity,
85            stats_set: Default::default(),
86        }
87    }
88
89    pub fn empty<T: NativePType>(nullability: Nullability) -> Self {
90        Self::new(Buffer::<T>::empty(), nullability.into())
91    }
92
93    pub fn from_byte_buffer(buffer: ByteBuffer, ptype: PType, validity: Validity) -> Self {
94        match_each_native_ptype!(ptype, |T| {
95            Self::new::<T>(Buffer::from_byte_buffer(buffer), validity)
96        })
97    }
98
99    /// Create a PrimitiveArray from an iterator of `T`.
100    /// NOTE: we cannot impl FromIterator trait since it conflicts with `FromIterator<T>`.
101    pub fn from_option_iter<T: NativePType, I: IntoIterator<Item = Option<T>>>(iter: I) -> Self {
102        let iter = iter.into_iter();
103        let mut values = BufferMut::with_capacity(iter.size_hint().0);
104        let mut validity = BooleanBufferBuilder::new(values.capacity());
105
106        for i in iter {
107            match i {
108                None => {
109                    validity.append(false);
110                    values.push(T::default());
111                }
112                Some(e) => {
113                    validity.append(true);
114                    values.push(e);
115                }
116            }
117        }
118        Self::new(values.freeze(), Validity::from(validity.finish()))
119    }
120
121    /// Create a PrimitiveArray from a byte buffer containing only the valid elements.
122    pub fn from_values_byte_buffer(
123        valid_elems_buffer: ByteBuffer,
124        ptype: PType,
125        validity: Validity,
126        n_rows: usize,
127    ) -> VortexResult<Self> {
128        let byte_width = ptype.byte_width();
129        let alignment = Alignment::new(byte_width);
130        let buffer = match &validity {
131            Validity::AllValid | Validity::NonNullable => valid_elems_buffer.aligned(alignment),
132            Validity::AllInvalid => ByteBuffer::zeroed_aligned(n_rows * byte_width, alignment),
133            Validity::Array(is_valid) => {
134                let bool_array = is_valid.to_canonical()?.into_bool()?;
135                let bool_buffer = bool_array.boolean_buffer();
136                let mut bytes = ByteBufferMut::zeroed_aligned(n_rows * byte_width, alignment);
137                for (i, valid_i) in bool_buffer.set_indices().enumerate() {
138                    bytes[valid_i * byte_width..(valid_i + 1) * byte_width]
139                        .copy_from_slice(&valid_elems_buffer[i * byte_width..(i + 1) * byte_width])
140                }
141                bytes.freeze()
142            }
143        };
144
145        Ok(Self::from_byte_buffer(buffer, ptype, validity))
146    }
147
148    pub fn ptype(&self) -> PType {
149        self.dtype().as_ptype()
150    }
151
152    pub fn byte_buffer(&self) -> &ByteBuffer {
153        &self.buffer
154    }
155
156    pub fn into_byte_buffer(self) -> ByteBuffer {
157        self.buffer
158    }
159
160    pub fn buffer<T: NativePType>(&self) -> Buffer<T> {
161        if T::PTYPE != self.ptype() {
162            vortex_panic!(
163                "Attempted to get buffer of type {} from array of type {}",
164                T::PTYPE,
165                self.ptype()
166            )
167        }
168        Buffer::from_byte_buffer(self.byte_buffer().clone())
169    }
170
171    pub fn into_buffer<T: NativePType>(self) -> Buffer<T> {
172        if T::PTYPE != self.ptype() {
173            vortex_panic!(
174                "Attempted to get buffer of type {} from array of type {}",
175                T::PTYPE,
176                self.ptype()
177            )
178        }
179        Buffer::from_byte_buffer(self.buffer)
180    }
181
182    /// Extract a mutable buffer from the PrimitiveArray. Attempts to do this with zero-copy
183    /// if the buffer is uniquely owned, otherwise will make a copy.
184    pub fn into_buffer_mut<T: NativePType>(self) -> BufferMut<T> {
185        if T::PTYPE != self.ptype() {
186            vortex_panic!(
187                "Attempted to get buffer_mut of type {} from array of type {}",
188                T::PTYPE,
189                self.ptype()
190            )
191        }
192        self.into_buffer()
193            .try_into_mut()
194            .unwrap_or_else(|buffer| BufferMut::<T>::copy_from(&buffer))
195    }
196
197    /// Try to extract a mutable buffer from the PrimitiveArray with zero copy.
198    #[allow(clippy::panic_in_result_fn)]
199    pub fn try_into_buffer_mut<T: NativePType>(self) -> Result<BufferMut<T>, PrimitiveArray> {
200        if T::PTYPE != self.ptype() {
201            vortex_panic!(
202                "Attempted to get buffer_mut of type {} from array of type {}",
203                T::PTYPE,
204                self.ptype()
205            )
206        }
207        let validity = self.validity().clone();
208        Buffer::<T>::from_byte_buffer(self.into_byte_buffer())
209            .try_into_mut()
210            .map_err(|buffer| PrimitiveArray::new(buffer, validity))
211    }
212
213    /// Map each element in the array to a new value.
214    ///
215    /// This ignores validity and maps over all maybe-null elements.
216    ///
217    /// TODO(ngates): we could be smarter here if validity is sparse and only run the function
218    ///   over the valid elements.
219    pub fn map_each<T, R, F>(self, f: F) -> PrimitiveArray
220    where
221        T: NativePType,
222        R: NativePType,
223        F: FnMut(T) -> R,
224    {
225        let validity = self.validity().clone();
226        let buffer = match self.try_into_buffer_mut() {
227            Ok(buffer_mut) => buffer_mut.map_each(f),
228            Err(parray) => BufferMut::<R>::from_iter(parray.buffer::<T>().iter().copied().map(f)),
229        };
230        PrimitiveArray::new(buffer.freeze(), validity)
231    }
232
233    /// Map each element in the array to a new value.
234    ///
235    /// This doesn't ignore validity and maps over all maybe-null elements, with a bool true if
236    /// valid and false otherwise.
237    pub fn map_each_with_validity<T, R, F>(self, f: F) -> VortexResult<PrimitiveArray>
238    where
239        T: NativePType,
240        R: NativePType,
241        F: FnMut((T, bool)) -> R,
242    {
243        let validity = self.validity();
244
245        let buf_iter = self.buffer::<T>().into_iter();
246
247        let buffer = match &validity {
248            Validity::NonNullable | Validity::AllValid => {
249                BufferMut::<R>::from_iter(buf_iter.zip(iter::repeat(true)).map(f))
250            }
251            Validity::AllInvalid => {
252                BufferMut::<R>::from_iter(buf_iter.zip(iter::repeat(false)).map(f))
253            }
254            Validity::Array(val) => {
255                let val = val.to_canonical()?.into_bool()?;
256                BufferMut::<R>::from_iter(buf_iter.zip(val.boolean_buffer()).map(f))
257            }
258        };
259        Ok(PrimitiveArray::new(buffer.freeze(), validity.clone()))
260    }
261
262    /// Return a slice of the array's buffer.
263    ///
264    /// NOTE: these values may be nonsense if the validity buffer indicates that the value is null.
265    pub fn as_slice<T: NativePType>(&self) -> &[T] {
266        if T::PTYPE != self.ptype() {
267            vortex_panic!(
268                "Attempted to get slice of type {} from array of type {}",
269                T::PTYPE,
270                self.ptype()
271            )
272        }
273        let raw_slice = self.byte_buffer().as_ptr();
274        // SAFETY: alignment of Buffer is checked on construction
275        unsafe {
276            std::slice::from_raw_parts(raw_slice.cast(), self.byte_buffer().len() / size_of::<T>())
277        }
278    }
279
280    pub fn reinterpret_cast(&self, ptype: PType) -> Self {
281        if self.ptype() == ptype {
282            return self.clone();
283        }
284
285        assert_eq!(
286            self.ptype().byte_width(),
287            ptype.byte_width(),
288            "can't reinterpret cast between integers of two different widths"
289        );
290
291        PrimitiveArray::from_byte_buffer(self.byte_buffer().clone(), ptype, self.validity().clone())
292    }
293}
294
295impl ArrayVTable<PrimitiveVTable> for PrimitiveVTable {
296    fn len(array: &PrimitiveArray) -> usize {
297        array.byte_buffer().len() / array.ptype().byte_width()
298    }
299
300    fn dtype(array: &PrimitiveArray) -> &DType {
301        &array.dtype
302    }
303
304    fn stats(array: &PrimitiveArray) -> StatsSetRef<'_> {
305        array.stats_set.to_ref(array.as_ref())
306    }
307}
308
309impl ValidityHelper for PrimitiveArray {
310    fn validity(&self) -> &Validity {
311        &self.validity
312    }
313}
314
315impl<T: NativePType> FromIterator<T> for PrimitiveArray {
316    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
317        let values = BufferMut::from_iter(iter);
318        PrimitiveArray::new(values.freeze(), Validity::NonNullable)
319    }
320}
321
322impl<T: NativePType> IntoArray for Buffer<T> {
323    fn into_array(self) -> ArrayRef {
324        PrimitiveArray::new(self, Validity::NonNullable).into_array()
325    }
326}
327
328impl<T: NativePType> IntoArray for BufferMut<T> {
329    fn into_array(self) -> ArrayRef {
330        self.freeze().into_array()
331    }
332}
333
334impl CanonicalVTable<PrimitiveVTable> for PrimitiveVTable {
335    fn canonicalize(array: &PrimitiveArray) -> VortexResult<Canonical> {
336        Ok(Canonical::Primitive(array.clone()))
337    }
338
339    fn append_to_builder(
340        array: &PrimitiveArray,
341        builder: &mut dyn ArrayBuilder,
342    ) -> VortexResult<()> {
343        builder.extend_from_array(array.as_ref())
344    }
345}
346
347#[cfg(test)]
348mod tests {
349    use vortex_buffer::buffer;
350    use vortex_scalar::PValue;
351
352    use crate::arrays::{BoolArray, PrimitiveArray};
353    use crate::compute::conformance::mask::test_mask;
354    use crate::compute::conformance::search_sorted::rstest_reuse::apply;
355    use crate::compute::conformance::search_sorted::{search_sorted_conformance, *};
356    use crate::search_sorted::{SearchResult, SearchSorted, SearchSortedSide};
357    use crate::validity::Validity;
358    use crate::{ArrayRef, IntoArray};
359
360    #[apply(search_sorted_conformance)]
361    fn search_sorted_primitive(
362        #[case] array: ArrayRef,
363        #[case] value: i32,
364        #[case] side: SearchSortedSide,
365        #[case] expected: SearchResult,
366    ) {
367        let res = array
368            .as_primitive_typed()
369            .search_sorted(&Some(PValue::from(value)), side);
370        assert_eq!(res, expected);
371    }
372
373    #[test]
374    fn test_mask_primitive_array() {
375        test_mask(PrimitiveArray::new(buffer![0, 1, 2, 3, 4], Validity::NonNullable).as_ref());
376        test_mask(PrimitiveArray::new(buffer![0, 1, 2, 3, 4], Validity::AllValid).as_ref());
377        test_mask(PrimitiveArray::new(buffer![0, 1, 2, 3, 4], Validity::AllInvalid).as_ref());
378        test_mask(
379            PrimitiveArray::new(
380                buffer![0, 1, 2, 3, 4],
381                Validity::Array(
382                    BoolArray::from_iter([true, false, true, false, true]).into_array(),
383                ),
384            )
385            .as_ref(),
386        );
387    }
388}