Skip to main content

vortex_array/arrays/primitive/array/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Display;
5use std::fmt::Formatter;
6use std::iter;
7
8use vortex_buffer::Alignment;
9use vortex_buffer::Buffer;
10use vortex_buffer::BufferMut;
11use vortex_buffer::ByteBuffer;
12use vortex_buffer::ByteBufferMut;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_error::vortex_err;
16use vortex_error::vortex_panic;
17
18use crate::LEGACY_SESSION;
19use crate::ToCanonical;
20use crate::VortexSessionExecute;
21use crate::array::Array;
22use crate::array::ArrayParts;
23use crate::array::TypedArrayRef;
24use crate::arrays::Primitive;
25use crate::arrays::PrimitiveArray;
26use crate::dtype::DType;
27use crate::dtype::NativePType;
28use crate::dtype::Nullability;
29use crate::dtype::PType;
30use crate::match_each_native_ptype;
31use crate::validity::Validity;
32
33mod accessor;
34mod cast;
35mod conversion;
36mod patch;
37mod top_value;
38
39pub use patch::chunk_range;
40pub use patch::patch_chunk;
41
42use crate::ArrayRef;
43use crate::aggregate_fn::fns::min_max::min_max;
44use crate::array::child_to_validity;
45use crate::array::validity_to_child;
46use crate::arrays::bool::BoolArrayExt;
47use crate::buffer::BufferHandle;
48use crate::builtins::ArrayBuiltins;
49
50/// The validity bitmap indicating which elements are non-null.
51pub(super) const VALIDITY_SLOT: usize = 0;
52pub(super) const NUM_SLOTS: usize = 1;
53pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["validity"];
54
55/// A primitive array that stores [native types][crate::dtype::NativePType] in a contiguous buffer
56/// of memory, along with an optional validity child.
57///
58/// This mirrors the Apache Arrow Primitive layout and can be converted into and out of one
59/// without allocations or copies.
60///
61/// The underlying buffer must be natively aligned to the primitive type they are representing.
62///
63/// Values are stored in their native representation with proper alignment.
64/// Null values still occupy space in the buffer but are marked invalid in the validity mask.
65///
66/// # Examples
67///
68/// ```
69/// # fn main() -> vortex_error::VortexResult<()> {
70/// use vortex_array::arrays::PrimitiveArray;
71///
72/// // Create from iterator using FromIterator impl
73/// let array: PrimitiveArray = [1i32, 2, 3, 4, 5].into_iter().collect();
74///
75/// // Slice the array
76/// let sliced = array.slice(1..3)?;
77///
78/// // Access individual values
79/// let value = sliced.scalar_at(0).unwrap();
80/// assert_eq!(value, 2i32.into());
81///
82/// # Ok(())
83/// # }
84/// ```
85#[derive(Clone, Debug)]
86pub struct PrimitiveData {
87    pub(super) ptype: PType,
88    pub(super) buffer: BufferHandle,
89}
90
91impl Display for PrimitiveData {
92    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
93        write!(f, "ptype: {}", self.ptype)
94    }
95}
96
97pub struct PrimitiveDataParts {
98    pub ptype: PType,
99    pub buffer: BufferHandle,
100    pub validity: Validity,
101}
102
103pub trait PrimitiveArrayExt: TypedArrayRef<Primitive> {
104    fn ptype(&self) -> PType {
105        match self.as_ref().dtype() {
106            DType::Primitive(ptype, _) => *ptype,
107            _ => unreachable!("PrimitiveArrayExt requires a primitive dtype"),
108        }
109    }
110
111    fn nullability(&self) -> Nullability {
112        match self.as_ref().dtype() {
113            DType::Primitive(_, nullability) => *nullability,
114            _ => unreachable!("PrimitiveArrayExt requires a primitive dtype"),
115        }
116    }
117
118    fn validity_child(&self) -> Option<&ArrayRef> {
119        self.as_ref().slots()[VALIDITY_SLOT].as_ref()
120    }
121
122    fn validity(&self) -> Validity {
123        child_to_validity(&self.as_ref().slots()[VALIDITY_SLOT], self.nullability())
124    }
125
126    fn validity_mask(&self) -> vortex_mask::Mask {
127        self.validity().to_mask(self.as_ref().len())
128    }
129
130    fn buffer_handle(&self) -> &BufferHandle {
131        &self.buffer
132    }
133
134    fn reinterpret_cast(&self, ptype: PType) -> PrimitiveArray {
135        if self.ptype() == ptype {
136            return self.to_owned();
137        }
138
139        assert_eq!(
140            self.ptype().byte_width(),
141            ptype.byte_width(),
142            "can't reinterpret cast between integers of two different widths"
143        );
144
145        PrimitiveArray::from_buffer_handle(self.buffer_handle().clone(), ptype, self.validity())
146    }
147
148    /// Narrow the array to the smallest possible integer type that can represent all values.
149    fn narrow(&self) -> VortexResult<PrimitiveArray> {
150        if !self.ptype().is_int() {
151            return Ok(self.to_owned());
152        }
153
154        let mut ctx = LEGACY_SESSION.create_execution_ctx();
155        let Some(min_max) = min_max(self.as_ref(), &mut ctx)? else {
156            return Ok(PrimitiveArray::new(
157                Buffer::<u8>::zeroed(self.len()),
158                self.validity(),
159            ));
160        };
161
162        // If we can't cast to i64, then leave the array as its original type.
163        // It's too big to downcast anyway.
164        let Ok(min) = min_max
165            .min
166            .cast(&PType::I64.into())
167            .and_then(|s| i64::try_from(&s))
168        else {
169            return Ok(self.to_owned());
170        };
171        let Ok(max) = min_max
172            .max
173            .cast(&PType::I64.into())
174            .and_then(|s| i64::try_from(&s))
175        else {
176            return Ok(self.to_owned());
177        };
178
179        let nullability = self.as_ref().dtype().nullability();
180
181        if min < 0 || max < 0 {
182            // Signed
183            if min >= i8::MIN as i64 && max <= i8::MAX as i64 {
184                return Ok(self
185                    .as_ref()
186                    .cast(DType::Primitive(PType::I8, nullability))?
187                    .to_primitive());
188            }
189
190            if min >= i16::MIN as i64 && max <= i16::MAX as i64 {
191                return Ok(self
192                    .as_ref()
193                    .cast(DType::Primitive(PType::I16, nullability))?
194                    .to_primitive());
195            }
196
197            if min >= i32::MIN as i64 && max <= i32::MAX as i64 {
198                return Ok(self
199                    .as_ref()
200                    .cast(DType::Primitive(PType::I32, nullability))?
201                    .to_primitive());
202            }
203        } else {
204            // Unsigned
205            if max <= u8::MAX as i64 {
206                return Ok(self
207                    .as_ref()
208                    .cast(DType::Primitive(PType::U8, nullability))?
209                    .to_primitive());
210            }
211
212            if max <= u16::MAX as i64 {
213                return Ok(self
214                    .as_ref()
215                    .cast(DType::Primitive(PType::U16, nullability))?
216                    .to_primitive());
217            }
218
219            if max <= u32::MAX as i64 {
220                return Ok(self
221                    .as_ref()
222                    .cast(DType::Primitive(PType::U32, nullability))?
223                    .to_primitive());
224            }
225        }
226
227        Ok(self.to_owned())
228    }
229}
230impl<T: TypedArrayRef<Primitive>> PrimitiveArrayExt for T {}
231
232// TODO(connor): There are a lot of places where we could be using `new_unchecked` in the codebase.
233impl PrimitiveData {
234    /// Build the slots vector for this array.
235    pub(super) fn make_slots(validity: &Validity, len: usize) -> Vec<Option<ArrayRef>> {
236        vec![validity_to_child(validity, len)]
237    }
238
239    /// Create a new array from a buffer handle.
240    ///
241    /// # Safety
242    ///
243    /// Should ensure that the provided BufferHandle points at sufficiently large region of aligned
244    /// memory to hold the `ptype` values.
245    pub unsafe fn new_unchecked_from_handle(
246        handle: BufferHandle,
247        ptype: PType,
248        _validity: Validity,
249    ) -> Self {
250        Self {
251            ptype,
252            buffer: handle,
253        }
254    }
255
256    /// Creates a new `PrimitiveArray`.
257    ///
258    /// # Panics
259    ///
260    /// Panics if the provided components do not satisfy the invariants documented
261    /// in `PrimitiveArray::new_unchecked`.
262    pub fn new<T: NativePType>(buffer: impl Into<Buffer<T>>, validity: Validity) -> Self {
263        let buffer = buffer.into();
264        Self::try_new(buffer, validity).vortex_expect("PrimitiveArray construction failed")
265    }
266
267    /// Constructs a new `PrimitiveArray`.
268    ///
269    /// See `PrimitiveArray::new_unchecked` for more information.
270    ///
271    /// # Errors
272    ///
273    /// Returns an error if the provided components do not satisfy the invariants documented in
274    /// `PrimitiveArray::new_unchecked`.
275    #[inline]
276    pub fn try_new<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> VortexResult<Self> {
277        Self::validate(&buffer, &validity)?;
278
279        // SAFETY: validate ensures all invariants are met.
280        Ok(unsafe { Self::new_unchecked(buffer, validity) })
281    }
282
283    /// Creates a new `PrimitiveArray` without validation from these components:
284    ///
285    /// * `buffer` is a typed buffer containing the primitive values.
286    /// * `validity` holds the null values.
287    ///
288    /// # Safety
289    ///
290    /// The caller must ensure all of the following invariants are satisfied:
291    ///
292    /// ## Validity Requirements
293    ///
294    /// - If `validity` is [`Validity::Array`], its length must exactly equal `buffer.len()`.
295    #[inline]
296    pub unsafe fn new_unchecked<T: NativePType>(buffer: Buffer<T>, _validity: Validity) -> Self {
297        #[cfg(debug_assertions)]
298        Self::validate(&buffer, &_validity)
299            .vortex_expect("[Debug Assertion]: Invalid `PrimitiveArray` parameters");
300
301        Self {
302            ptype: T::PTYPE,
303            buffer: BufferHandle::new_host(buffer.into_byte_buffer()),
304        }
305    }
306
307    /// Validates the components that would be used to create a `PrimitiveArray`.
308    ///
309    /// This function checks all the invariants required by `PrimitiveArray::new_unchecked`.
310    #[inline]
311    pub fn validate<T: NativePType>(buffer: &Buffer<T>, validity: &Validity) -> VortexResult<()> {
312        if let Some(len) = validity.maybe_len()
313            && buffer.len() != len
314        {
315            return Err(vortex_err!(
316                InvalidArgument:
317                "Buffer and validity length mismatch: buffer={}, validity={}",
318                buffer.len(),
319                len
320            ));
321        }
322        Ok(())
323    }
324
325    pub fn empty<T: NativePType>(nullability: Nullability) -> Self {
326        Self::new(Buffer::<T>::empty(), nullability.into())
327    }
328}
329
330impl Array<Primitive> {
331    pub fn empty<T: NativePType>(nullability: Nullability) -> Self {
332        let dtype = DType::Primitive(T::PTYPE, nullability);
333        let len = 0;
334        let data = PrimitiveData::empty::<T>(nullability);
335        let slots = PrimitiveData::make_slots(&Validity::from(nullability), len);
336        unsafe {
337            Array::from_parts_unchecked(
338                ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
339            )
340        }
341    }
342
343    /// Creates a new `PrimitiveArray`.
344    ///
345    /// # Panics
346    ///
347    /// Panics if the provided components do not satisfy the invariants.
348    pub fn new<T: NativePType>(buffer: impl Into<Buffer<T>>, validity: Validity) -> Self {
349        let buffer = buffer.into();
350        let dtype = DType::Primitive(T::PTYPE, validity.nullability());
351        let len = buffer.len();
352        let slots = PrimitiveData::make_slots(&validity, len);
353        let data = PrimitiveData::new(buffer, validity);
354        unsafe {
355            Array::from_parts_unchecked(
356                ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
357            )
358        }
359    }
360
361    /// Constructs a new `PrimitiveArray`.
362    pub fn try_new<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> VortexResult<Self> {
363        let dtype = DType::Primitive(T::PTYPE, validity.nullability());
364        let len = buffer.len();
365        let slots = PrimitiveData::make_slots(&validity, len);
366        let data = PrimitiveData::try_new(buffer, validity)?;
367        Ok(unsafe {
368            Array::from_parts_unchecked(
369                ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
370            )
371        })
372    }
373
374    /// Creates a new `PrimitiveArray` without validation.
375    ///
376    /// # Safety
377    ///
378    /// See [`PrimitiveData::new_unchecked`].
379    pub unsafe fn new_unchecked<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> Self {
380        let dtype = DType::Primitive(T::PTYPE, validity.nullability());
381        let len = buffer.len();
382        let slots = PrimitiveData::make_slots(&validity, len);
383        let data = unsafe { PrimitiveData::new_unchecked(buffer, validity) };
384        unsafe {
385            Array::from_parts_unchecked(
386                ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
387            )
388        }
389    }
390
391    /// Create a new array from a buffer handle.
392    ///
393    /// # Safety
394    ///
395    /// See [`PrimitiveData::new_unchecked_from_handle`].
396    pub unsafe fn new_unchecked_from_handle(
397        handle: BufferHandle,
398        ptype: PType,
399        validity: Validity,
400    ) -> Self {
401        let dtype = DType::Primitive(ptype, validity.nullability());
402        let len = handle.len() / ptype.byte_width();
403        let slots = PrimitiveData::make_slots(&validity, len);
404        let data = unsafe { PrimitiveData::new_unchecked_from_handle(handle, ptype, validity) };
405        unsafe {
406            Array::from_parts_unchecked(
407                ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
408            )
409        }
410    }
411
412    /// Creates a new `PrimitiveArray` from a [`BufferHandle`].
413    pub fn from_buffer_handle(handle: BufferHandle, ptype: PType, validity: Validity) -> Self {
414        let dtype = DType::Primitive(ptype, validity.nullability());
415        let len = handle.len() / ptype.byte_width();
416        let slots = PrimitiveData::make_slots(&validity, len);
417        let data = PrimitiveData::from_buffer_handle(handle, ptype, validity);
418        Array::try_from_parts(ArrayParts::new(Primitive, dtype, len, data).with_slots(slots))
419            .vortex_expect("PrimitiveData is always valid")
420    }
421
422    /// Creates a new `PrimitiveArray` from a [`ByteBuffer`].
423    pub fn from_byte_buffer(buffer: ByteBuffer, ptype: PType, validity: Validity) -> Self {
424        let dtype = DType::Primitive(ptype, validity.nullability());
425        let len = buffer.len() / ptype.byte_width();
426        let slots = PrimitiveData::make_slots(&validity, len);
427        let data = PrimitiveData::from_byte_buffer(buffer, ptype, validity);
428        unsafe {
429            Array::from_parts_unchecked(
430                ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
431            )
432        }
433    }
434
435    /// Create a PrimitiveArray from a byte buffer containing only the valid elements.
436    pub fn from_values_byte_buffer(
437        valid_elems_buffer: ByteBuffer,
438        ptype: PType,
439        validity: Validity,
440        n_rows: usize,
441    ) -> Self {
442        let dtype = DType::Primitive(ptype, validity.nullability());
443        let len = n_rows;
444        let slots = PrimitiveData::make_slots(&validity, len);
445        let data =
446            PrimitiveData::from_values_byte_buffer(valid_elems_buffer, ptype, validity, n_rows);
447        unsafe {
448            Array::from_parts_unchecked(
449                ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
450            )
451        }
452    }
453
454    /// Validates the components that would be used to create a `PrimitiveArray`.
455    pub fn validate<T: NativePType>(buffer: &Buffer<T>, validity: &Validity) -> VortexResult<()> {
456        PrimitiveData::validate(buffer, validity)
457    }
458
459    pub fn into_data_parts(self) -> PrimitiveDataParts {
460        let validity = PrimitiveArrayExt::validity(&self);
461        let ptype = PrimitiveArrayExt::ptype(&self);
462        let data = self.into_data();
463        PrimitiveDataParts {
464            ptype,
465            buffer: data.buffer,
466            validity,
467        }
468    }
469
470    pub fn map_each_with_validity<T, R, F>(self, f: F) -> VortexResult<Self>
471    where
472        T: NativePType,
473        R: NativePType,
474        F: FnMut((T, bool)) -> R,
475    {
476        let validity = PrimitiveArrayExt::validity(&self);
477        let data = self.into_data();
478        let buf_iter = data.to_buffer::<T>().into_iter();
479
480        let buffer = match &validity {
481            Validity::NonNullable | Validity::AllValid => {
482                BufferMut::<R>::from_iter(buf_iter.zip(iter::repeat(true)).map(f))
483            }
484            Validity::AllInvalid => {
485                BufferMut::<R>::from_iter(buf_iter.zip(iter::repeat(false)).map(f))
486            }
487            Validity::Array(val) => {
488                let val = val.to_bool().into_bit_buffer();
489                BufferMut::<R>::from_iter(buf_iter.zip(val.iter()).map(f))
490            }
491        };
492        Ok(PrimitiveArray::new(buffer.freeze(), validity))
493    }
494}
495
496impl PrimitiveData {
497    pub fn len(&self) -> usize {
498        self.buffer.len() / self.ptype.byte_width()
499    }
500
501    /// Returns `true` if the array is empty.
502    pub fn is_empty(&self) -> bool {
503        self.buffer.is_empty()
504    }
505
506    pub fn ptype(&self) -> PType {
507        self.ptype
508    }
509
510    /// Get access to the buffer handle backing the array.
511    pub fn buffer_handle(&self) -> &BufferHandle {
512        &self.buffer
513    }
514
515    pub fn from_buffer_handle(handle: BufferHandle, ptype: PType, _validity: Validity) -> Self {
516        Self {
517            ptype,
518            buffer: handle,
519        }
520    }
521
522    pub fn from_byte_buffer(buffer: ByteBuffer, ptype: PType, validity: Validity) -> Self {
523        match_each_native_ptype!(ptype, |T| {
524            Self::new::<T>(Buffer::from_byte_buffer(buffer), validity)
525        })
526    }
527
528    /// Create a PrimitiveArray from a byte buffer containing only the valid elements.
529    pub fn from_values_byte_buffer(
530        valid_elems_buffer: ByteBuffer,
531        ptype: PType,
532        validity: Validity,
533        n_rows: usize,
534    ) -> Self {
535        let byte_width = ptype.byte_width();
536        let alignment = Alignment::new(byte_width);
537        let buffer = match &validity {
538            Validity::AllValid | Validity::NonNullable => valid_elems_buffer.aligned(alignment),
539            Validity::AllInvalid => ByteBuffer::zeroed_aligned(n_rows * byte_width, alignment),
540            Validity::Array(is_valid) => {
541                let bool_array = is_valid.to_bool();
542                let bool_buffer = bool_array.to_bit_buffer();
543                let mut bytes = ByteBufferMut::zeroed_aligned(n_rows * byte_width, alignment);
544                for (i, valid_i) in bool_buffer.set_indices().enumerate() {
545                    bytes[valid_i * byte_width..(valid_i + 1) * byte_width]
546                        .copy_from_slice(&valid_elems_buffer[i * byte_width..(i + 1) * byte_width])
547                }
548                bytes.freeze()
549            }
550        };
551
552        Self::from_byte_buffer(buffer, ptype, validity)
553    }
554
555    /// Get a buffer in host memory holding all the values.
556    ///
557    /// NOTE: some values may be nonsense if the validity buffer indicates that the value is null.
558    pub fn to_buffer<T: NativePType>(&self) -> Buffer<T> {
559        if T::PTYPE != self.ptype() {
560            vortex_panic!(
561                "Attempted to get buffer of type {} from array of type {}",
562                T::PTYPE,
563                self.ptype()
564            )
565        }
566        Buffer::from_byte_buffer(self.buffer_handle().to_host_sync())
567    }
568
569    /// Consume the array and get a host Buffer containing the data values.
570    pub fn into_buffer<T: NativePType>(self) -> Buffer<T> {
571        if T::PTYPE != self.ptype() {
572            vortex_panic!(
573                "Attempted to get buffer of type {} from array of type {}",
574                T::PTYPE,
575                self.ptype()
576            )
577        }
578        Buffer::from_byte_buffer(self.buffer.into_host_sync())
579    }
580
581    /// Extract a mutable buffer from the PrimitiveData. Attempts to do this with zero-copy
582    /// if the buffer is uniquely owned, otherwise will make a copy.
583    pub fn into_buffer_mut<T: NativePType>(self) -> BufferMut<T> {
584        self.try_into_buffer_mut()
585            .unwrap_or_else(|buffer| BufferMut::<T>::copy_from(&buffer))
586    }
587
588    /// Try to extract a mutable buffer from the PrimitiveData with zero copy.
589    pub fn try_into_buffer_mut<T: NativePType>(self) -> Result<BufferMut<T>, Buffer<T>> {
590        if T::PTYPE != self.ptype() {
591            vortex_panic!(
592                "Attempted to get buffer_mut of type {} from array of type {}",
593                T::PTYPE,
594                self.ptype()
595            )
596        }
597        let buffer = Buffer::<T>::from_byte_buffer(self.buffer.into_host_sync());
598        buffer.try_into_mut()
599    }
600}