Skip to main content

vortex_array/arrays/primitive/array/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Display;
5use std::fmt::Formatter;
6use std::iter;
7
8use vortex_buffer::Alignment;
9use vortex_buffer::Buffer;
10use vortex_buffer::BufferMut;
11use vortex_buffer::ByteBuffer;
12use vortex_buffer::ByteBufferMut;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_error::vortex_err;
16use vortex_error::vortex_panic;
17
18use crate::LEGACY_SESSION;
19use crate::ToCanonical;
20use crate::VortexSessionExecute;
21use crate::array::Array;
22use crate::array::ArrayParts;
23use crate::array::TypedArrayRef;
24use crate::arrays::Primitive;
25use crate::arrays::PrimitiveArray;
26use crate::dtype::DType;
27use crate::dtype::NativePType;
28use crate::dtype::Nullability;
29use crate::dtype::PType;
30use crate::match_each_native_ptype;
31use crate::validity::Validity;
32
33mod accessor;
34mod cast;
35mod conversion;
36mod patch;
37mod top_value;
38
39pub use patch::chunk_range;
40pub use patch::patch_chunk;
41
42use crate::ArrayRef;
43use crate::aggregate_fn::fns::min_max::min_max;
44use crate::array::child_to_validity;
45use crate::array::validity_to_child;
46use crate::arrays::bool::BoolArrayExt;
47use crate::buffer::BufferHandle;
48use crate::builtins::ArrayBuiltins;
49
50/// The validity bitmap indicating which elements are non-null.
51pub(super) const VALIDITY_SLOT: usize = 0;
52pub(super) const NUM_SLOTS: usize = 1;
53pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["validity"];
54
55/// A primitive array that stores [native types][crate::dtype::NativePType] in a contiguous buffer
56/// of memory, along with an optional validity child.
57///
58/// This mirrors the Apache Arrow Primitive layout and can be converted into and out of one
59/// without allocations or copies.
60///
61/// The underlying buffer must be natively aligned to the primitive type they are representing.
62///
63/// Values are stored in their native representation with proper alignment.
64/// Null values still occupy space in the buffer but are marked invalid in the validity mask.
65///
66/// # Examples
67///
68/// ```
69/// # fn main() -> vortex_error::VortexResult<()> {
70/// use vortex_array::arrays::PrimitiveArray;
71/// use vortex_array::{LEGACY_SESSION, VortexSessionExecute};
72///
73/// // Create from iterator using FromIterator impl
74/// let array: PrimitiveArray = [1i32, 2, 3, 4, 5].into_iter().collect();
75///
76/// // Slice the array
77/// let sliced = array.slice(1..3)?;
78///
79/// // Access individual values
80/// let mut ctx = LEGACY_SESSION.create_execution_ctx();
81/// let value = sliced.execute_scalar(0, &mut ctx).unwrap();
82/// assert_eq!(value, 2i32.into());
83///
84/// # Ok(())
85/// # }
86/// ```
87#[derive(Clone, Debug)]
88pub struct PrimitiveData {
89    pub(super) ptype: PType,
90    pub(super) buffer: BufferHandle,
91}
92
93impl Display for PrimitiveData {
94    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
95        write!(f, "ptype: {}", self.ptype)
96    }
97}
98
99pub struct PrimitiveDataParts {
100    pub ptype: PType,
101    pub buffer: BufferHandle,
102    pub validity: Validity,
103}
104
105pub trait PrimitiveArrayExt: TypedArrayRef<Primitive> {
106    fn ptype(&self) -> PType {
107        match self.as_ref().dtype() {
108            DType::Primitive(ptype, _) => *ptype,
109            _ => unreachable!("PrimitiveArrayExt requires a primitive dtype"),
110        }
111    }
112
113    fn nullability(&self) -> Nullability {
114        match self.as_ref().dtype() {
115            DType::Primitive(_, nullability) => *nullability,
116            _ => unreachable!("PrimitiveArrayExt requires a primitive dtype"),
117        }
118    }
119
120    fn validity_child(&self) -> Option<&ArrayRef> {
121        self.as_ref().slots()[VALIDITY_SLOT].as_ref()
122    }
123
124    fn validity(&self) -> Validity {
125        child_to_validity(&self.as_ref().slots()[VALIDITY_SLOT], self.nullability())
126    }
127
128    fn buffer_handle(&self) -> &BufferHandle {
129        &self.buffer
130    }
131
132    fn reinterpret_cast(&self, ptype: PType) -> PrimitiveArray {
133        if self.ptype() == ptype {
134            return self.to_owned();
135        }
136
137        assert_eq!(
138            self.ptype().byte_width(),
139            ptype.byte_width(),
140            "can't reinterpret cast between integers of two different widths"
141        );
142
143        PrimitiveArray::from_buffer_handle(self.buffer_handle().clone(), ptype, self.validity())
144    }
145
146    /// Narrow the array to the smallest possible integer type that can represent all values.
147    fn narrow(&self) -> VortexResult<PrimitiveArray> {
148        if !self.ptype().is_int() {
149            return Ok(self.to_owned());
150        }
151
152        let mut ctx = LEGACY_SESSION.create_execution_ctx();
153        let Some(min_max) = min_max(self.as_ref(), &mut ctx)? else {
154            return Ok(PrimitiveArray::new(
155                Buffer::<u8>::zeroed(self.len()),
156                self.validity(),
157            ));
158        };
159
160        // If we can't cast to i64, then leave the array as its original type.
161        // It's too big to downcast anyway.
162        let Ok(min) = min_max
163            .min
164            .cast(&PType::I64.into())
165            .and_then(|s| i64::try_from(&s))
166        else {
167            return Ok(self.to_owned());
168        };
169        let Ok(max) = min_max
170            .max
171            .cast(&PType::I64.into())
172            .and_then(|s| i64::try_from(&s))
173        else {
174            return Ok(self.to_owned());
175        };
176
177        let nullability = self.as_ref().dtype().nullability();
178
179        if min < 0 || max < 0 {
180            // Signed
181            if min >= i8::MIN as i64 && max <= i8::MAX as i64 {
182                return Ok(self
183                    .as_ref()
184                    .cast(DType::Primitive(PType::I8, nullability))?
185                    .to_primitive());
186            }
187
188            if min >= i16::MIN as i64 && max <= i16::MAX as i64 {
189                return Ok(self
190                    .as_ref()
191                    .cast(DType::Primitive(PType::I16, nullability))?
192                    .to_primitive());
193            }
194
195            if min >= i32::MIN as i64 && max <= i32::MAX as i64 {
196                return Ok(self
197                    .as_ref()
198                    .cast(DType::Primitive(PType::I32, nullability))?
199                    .to_primitive());
200            }
201        } else {
202            // Unsigned
203            if max <= u8::MAX as i64 {
204                return Ok(self
205                    .as_ref()
206                    .cast(DType::Primitive(PType::U8, nullability))?
207                    .to_primitive());
208            }
209
210            if max <= u16::MAX as i64 {
211                return Ok(self
212                    .as_ref()
213                    .cast(DType::Primitive(PType::U16, nullability))?
214                    .to_primitive());
215            }
216
217            if max <= u32::MAX as i64 {
218                return Ok(self
219                    .as_ref()
220                    .cast(DType::Primitive(PType::U32, nullability))?
221                    .to_primitive());
222            }
223        }
224
225        Ok(self.to_owned())
226    }
227}
228impl<T: TypedArrayRef<Primitive>> PrimitiveArrayExt for T {}
229
230// TODO(connor): There are a lot of places where we could be using `new_unchecked` in the codebase.
231impl PrimitiveData {
232    /// Build the slots vector for this array.
233    pub(super) fn make_slots(validity: &Validity, len: usize) -> Vec<Option<ArrayRef>> {
234        vec![validity_to_child(validity, len)]
235    }
236
237    /// Create a new array from a buffer handle.
238    ///
239    /// # Safety
240    ///
241    /// Should ensure that the provided BufferHandle points at sufficiently large region of aligned
242    /// memory to hold the `ptype` values.
243    pub unsafe fn new_unchecked_from_handle(
244        handle: BufferHandle,
245        ptype: PType,
246        _validity: Validity,
247    ) -> Self {
248        Self {
249            ptype,
250            buffer: handle,
251        }
252    }
253
254    /// Creates a new `PrimitiveArray`.
255    ///
256    /// # Panics
257    ///
258    /// Panics if the provided components do not satisfy the invariants documented
259    /// in `PrimitiveArray::new_unchecked`.
260    pub fn new<T: NativePType>(buffer: impl Into<Buffer<T>>, validity: Validity) -> Self {
261        let buffer = buffer.into();
262        Self::try_new(buffer, validity).vortex_expect("PrimitiveArray construction failed")
263    }
264
265    /// Constructs a new `PrimitiveArray`.
266    ///
267    /// See `PrimitiveArray::new_unchecked` for more information.
268    ///
269    /// # Errors
270    ///
271    /// Returns an error if the provided components do not satisfy the invariants documented in
272    /// `PrimitiveArray::new_unchecked`.
273    #[inline]
274    pub fn try_new<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> VortexResult<Self> {
275        Self::validate(&buffer, &validity)?;
276
277        // SAFETY: validate ensures all invariants are met.
278        Ok(unsafe { Self::new_unchecked(buffer, validity) })
279    }
280
281    /// Creates a new `PrimitiveArray` without validation from these components:
282    ///
283    /// * `buffer` is a typed buffer containing the primitive values.
284    /// * `validity` holds the null values.
285    ///
286    /// # Safety
287    ///
288    /// The caller must ensure all of the following invariants are satisfied:
289    ///
290    /// ## Validity Requirements
291    ///
292    /// - If `validity` is [`Validity::Array`], its length must exactly equal `buffer.len()`.
293    #[inline]
294    pub unsafe fn new_unchecked<T: NativePType>(buffer: Buffer<T>, _validity: Validity) -> Self {
295        #[cfg(debug_assertions)]
296        Self::validate(&buffer, &_validity)
297            .vortex_expect("[Debug Assertion]: Invalid `PrimitiveArray` parameters");
298
299        Self {
300            ptype: T::PTYPE,
301            buffer: BufferHandle::new_host(buffer.into_byte_buffer()),
302        }
303    }
304
305    /// Validates the components that would be used to create a `PrimitiveArray`.
306    ///
307    /// This function checks all the invariants required by `PrimitiveArray::new_unchecked`.
308    #[inline]
309    pub fn validate<T: NativePType>(buffer: &Buffer<T>, validity: &Validity) -> VortexResult<()> {
310        if let Some(len) = validity.maybe_len()
311            && buffer.len() != len
312        {
313            return Err(vortex_err!(
314                InvalidArgument:
315                "Buffer and validity length mismatch: buffer={}, validity={}",
316                buffer.len(),
317                len
318            ));
319        }
320        Ok(())
321    }
322
323    pub fn empty<T: NativePType>(nullability: Nullability) -> Self {
324        Self::new(Buffer::<T>::empty(), nullability.into())
325    }
326}
327
328impl Array<Primitive> {
329    pub fn empty<T: NativePType>(nullability: Nullability) -> Self {
330        let dtype = DType::Primitive(T::PTYPE, nullability);
331        let len = 0;
332        let data = PrimitiveData::empty::<T>(nullability);
333        let slots = PrimitiveData::make_slots(&Validity::from(nullability), len);
334        unsafe {
335            Array::from_parts_unchecked(
336                ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
337            )
338        }
339    }
340
341    /// Creates a new `PrimitiveArray`.
342    ///
343    /// # Panics
344    ///
345    /// Panics if the provided components do not satisfy the invariants.
346    pub fn new<T: NativePType>(buffer: impl Into<Buffer<T>>, validity: Validity) -> Self {
347        let buffer = buffer.into();
348        let dtype = DType::Primitive(T::PTYPE, validity.nullability());
349        let len = buffer.len();
350        let slots = PrimitiveData::make_slots(&validity, len);
351        let data = PrimitiveData::new(buffer, validity);
352        unsafe {
353            Array::from_parts_unchecked(
354                ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
355            )
356        }
357    }
358
359    /// Constructs a new `PrimitiveArray`.
360    pub fn try_new<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> VortexResult<Self> {
361        let dtype = DType::Primitive(T::PTYPE, validity.nullability());
362        let len = buffer.len();
363        let slots = PrimitiveData::make_slots(&validity, len);
364        let data = PrimitiveData::try_new(buffer, validity)?;
365        Ok(unsafe {
366            Array::from_parts_unchecked(
367                ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
368            )
369        })
370    }
371
372    /// Creates a new `PrimitiveArray` without validation.
373    ///
374    /// # Safety
375    ///
376    /// See [`PrimitiveData::new_unchecked`].
377    pub unsafe fn new_unchecked<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> Self {
378        let dtype = DType::Primitive(T::PTYPE, validity.nullability());
379        let len = buffer.len();
380        let slots = PrimitiveData::make_slots(&validity, len);
381        let data = unsafe { PrimitiveData::new_unchecked(buffer, validity) };
382        unsafe {
383            Array::from_parts_unchecked(
384                ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
385            )
386        }
387    }
388
389    /// Create a new array from a buffer handle.
390    ///
391    /// # Safety
392    ///
393    /// See [`PrimitiveData::new_unchecked_from_handle`].
394    pub unsafe fn new_unchecked_from_handle(
395        handle: BufferHandle,
396        ptype: PType,
397        validity: Validity,
398    ) -> Self {
399        let dtype = DType::Primitive(ptype, validity.nullability());
400        let len = handle.len() / ptype.byte_width();
401        let slots = PrimitiveData::make_slots(&validity, len);
402        let data = unsafe { PrimitiveData::new_unchecked_from_handle(handle, ptype, validity) };
403        unsafe {
404            Array::from_parts_unchecked(
405                ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
406            )
407        }
408    }
409
410    /// Creates a new `PrimitiveArray` from a [`BufferHandle`].
411    pub fn from_buffer_handle(handle: BufferHandle, ptype: PType, validity: Validity) -> Self {
412        let dtype = DType::Primitive(ptype, validity.nullability());
413        let len = handle.len() / ptype.byte_width();
414        let slots = PrimitiveData::make_slots(&validity, len);
415        let data = PrimitiveData::from_buffer_handle(handle, ptype, validity);
416        Array::try_from_parts(ArrayParts::new(Primitive, dtype, len, data).with_slots(slots))
417            .vortex_expect("PrimitiveData is always valid")
418    }
419
420    /// Creates a new `PrimitiveArray` from a [`ByteBuffer`].
421    pub fn from_byte_buffer(buffer: ByteBuffer, ptype: PType, validity: Validity) -> Self {
422        let dtype = DType::Primitive(ptype, validity.nullability());
423        let len = buffer.len() / ptype.byte_width();
424        let slots = PrimitiveData::make_slots(&validity, len);
425        let data = PrimitiveData::from_byte_buffer(buffer, ptype, validity);
426        unsafe {
427            Array::from_parts_unchecked(
428                ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
429            )
430        }
431    }
432
433    /// Create a PrimitiveArray from a byte buffer containing only the valid elements.
434    pub fn from_values_byte_buffer(
435        valid_elems_buffer: ByteBuffer,
436        ptype: PType,
437        validity: Validity,
438        n_rows: usize,
439    ) -> Self {
440        let dtype = DType::Primitive(ptype, validity.nullability());
441        let len = n_rows;
442        let slots = PrimitiveData::make_slots(&validity, len);
443        let data =
444            PrimitiveData::from_values_byte_buffer(valid_elems_buffer, ptype, validity, n_rows);
445        unsafe {
446            Array::from_parts_unchecked(
447                ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
448            )
449        }
450    }
451
452    /// Validates the components that would be used to create a `PrimitiveArray`.
453    pub fn validate<T: NativePType>(buffer: &Buffer<T>, validity: &Validity) -> VortexResult<()> {
454        PrimitiveData::validate(buffer, validity)
455    }
456
457    pub fn into_data_parts(self) -> PrimitiveDataParts {
458        let validity = PrimitiveArrayExt::validity(&self);
459        let ptype = PrimitiveArrayExt::ptype(&self);
460        let data = self.into_data();
461        PrimitiveDataParts {
462            ptype,
463            buffer: data.buffer,
464            validity,
465        }
466    }
467
468    pub fn map_each_with_validity<T, R, F>(self, f: F) -> VortexResult<Self>
469    where
470        T: NativePType,
471        R: NativePType,
472        F: FnMut((T, bool)) -> R,
473    {
474        let validity = PrimitiveArrayExt::validity(&self);
475        let data = self.into_data();
476        let buf_iter = data.to_buffer::<T>().into_iter();
477
478        let buffer = match &validity {
479            Validity::NonNullable | Validity::AllValid => {
480                BufferMut::<R>::from_iter(buf_iter.zip(iter::repeat(true)).map(f))
481            }
482            Validity::AllInvalid => {
483                BufferMut::<R>::from_iter(buf_iter.zip(iter::repeat(false)).map(f))
484            }
485            Validity::Array(val) => {
486                let val = val.to_bool().into_bit_buffer();
487                BufferMut::<R>::from_iter(buf_iter.zip(val.iter()).map(f))
488            }
489        };
490        Ok(PrimitiveArray::new(buffer.freeze(), validity))
491    }
492}
493
494impl PrimitiveData {
495    pub fn len(&self) -> usize {
496        self.buffer.len() / self.ptype.byte_width()
497    }
498
499    /// Returns `true` if the array is empty.
500    pub fn is_empty(&self) -> bool {
501        self.buffer.is_empty()
502    }
503
504    pub fn ptype(&self) -> PType {
505        self.ptype
506    }
507
508    /// Get access to the buffer handle backing the array.
509    pub fn buffer_handle(&self) -> &BufferHandle {
510        &self.buffer
511    }
512
513    pub fn from_buffer_handle(handle: BufferHandle, ptype: PType, _validity: Validity) -> Self {
514        Self {
515            ptype,
516            buffer: handle,
517        }
518    }
519
520    pub fn from_byte_buffer(buffer: ByteBuffer, ptype: PType, validity: Validity) -> Self {
521        match_each_native_ptype!(ptype, |T| {
522            Self::new::<T>(Buffer::from_byte_buffer(buffer), validity)
523        })
524    }
525
526    /// Create a PrimitiveArray from a byte buffer containing only the valid elements.
527    pub fn from_values_byte_buffer(
528        valid_elems_buffer: ByteBuffer,
529        ptype: PType,
530        validity: Validity,
531        n_rows: usize,
532    ) -> Self {
533        let byte_width = ptype.byte_width();
534        let alignment = Alignment::new(byte_width);
535        let buffer = match &validity {
536            Validity::AllValid | Validity::NonNullable => valid_elems_buffer.aligned(alignment),
537            Validity::AllInvalid => ByteBuffer::zeroed_aligned(n_rows * byte_width, alignment),
538            Validity::Array(is_valid) => {
539                let bool_array = is_valid.to_bool();
540                let bool_buffer = bool_array.to_bit_buffer();
541                let mut bytes = ByteBufferMut::zeroed_aligned(n_rows * byte_width, alignment);
542                for (i, valid_i) in bool_buffer.set_indices().enumerate() {
543                    bytes[valid_i * byte_width..(valid_i + 1) * byte_width]
544                        .copy_from_slice(&valid_elems_buffer[i * byte_width..(i + 1) * byte_width])
545                }
546                bytes.freeze()
547            }
548        };
549
550        Self::from_byte_buffer(buffer, ptype, validity)
551    }
552
553    /// Get a buffer in host memory holding all the values.
554    ///
555    /// NOTE: some values may be nonsense if the validity buffer indicates that the value is null.
556    pub fn to_buffer<T: NativePType>(&self) -> Buffer<T> {
557        if T::PTYPE != self.ptype() {
558            vortex_panic!(
559                "Attempted to get buffer of type {} from array of type {}",
560                T::PTYPE,
561                self.ptype()
562            )
563        }
564        Buffer::from_byte_buffer(self.buffer_handle().to_host_sync())
565    }
566
567    /// Consume the array and get a host Buffer containing the data values.
568    pub fn into_buffer<T: NativePType>(self) -> Buffer<T> {
569        if T::PTYPE != self.ptype() {
570            vortex_panic!(
571                "Attempted to get buffer of type {} from array of type {}",
572                T::PTYPE,
573                self.ptype()
574            )
575        }
576        Buffer::from_byte_buffer(self.buffer.into_host_sync())
577    }
578
579    /// Extract a mutable buffer from the PrimitiveData. Attempts to do this with zero-copy
580    /// if the buffer is uniquely owned, otherwise will make a copy.
581    pub fn into_buffer_mut<T: NativePType>(self) -> BufferMut<T> {
582        self.try_into_buffer_mut()
583            .unwrap_or_else(|buffer| BufferMut::<T>::copy_from(&buffer))
584    }
585
586    /// Try to extract a mutable buffer from the PrimitiveData with zero copy.
587    pub fn try_into_buffer_mut<T: NativePType>(self) -> Result<BufferMut<T>, Buffer<T>> {
588        if T::PTYPE != self.ptype() {
589            vortex_panic!(
590                "Attempted to get buffer_mut of type {} from array of type {}",
591                T::PTYPE,
592                self.ptype()
593            )
594        }
595        let buffer = Buffer::<T>::from_byte_buffer(self.buffer.into_host_sync());
596        buffer.try_into_mut()
597    }
598}