Skip to main content

vortex_array/arrays/primitive/array/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Display;
5use std::fmt::Formatter;
6use std::iter::repeat;
7
8use smallvec::smallvec;
9use vortex_buffer::Alignment;
10use vortex_buffer::Buffer;
11use vortex_buffer::BufferMut;
12use vortex_buffer::ByteBuffer;
13use vortex_buffer::ByteBufferMut;
14use vortex_error::VortexExpect;
15use vortex_error::VortexResult;
16use vortex_error::vortex_err;
17use vortex_error::vortex_panic;
18
19use crate::ArraySlots;
20use crate::ExecutionCtx;
21use crate::array::Array;
22use crate::array::ArrayParts;
23use crate::array::TypedArrayRef;
24use crate::arrays::BoolArray;
25use crate::arrays::Primitive;
26use crate::arrays::PrimitiveArray;
27use crate::dtype::DType;
28use crate::dtype::NativePType;
29use crate::dtype::Nullability;
30use crate::dtype::PType;
31use crate::match_each_native_ptype;
32use crate::validity::Validity;
33
34mod cast;
35mod conversion;
36mod patch;
37mod top_value;
38
39pub use patch::chunk_range;
40pub use patch::patch_chunk;
41
42use crate::ArrayRef;
43use crate::aggregate_fn::NumericalAggregateOpts;
44use crate::aggregate_fn::fns::min_max::min_max;
45use crate::array::child_to_validity;
46use crate::array::validity_to_child;
47use crate::arrays::bool::BoolArrayExt;
48use crate::buffer::BufferHandle;
49use crate::builtins::ArrayBuiltins;
50
51/// The validity bitmap indicating which elements are non-null.
52pub(super) const VALIDITY_SLOT: usize = 0;
53pub(super) const NUM_SLOTS: usize = 1;
54pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["validity"];
55
56/// A primitive array that stores [native types][crate::dtype::NativePType] in a contiguous buffer
57/// of memory, along with an optional validity child.
58///
59/// This mirrors the Apache Arrow Primitive layout and can be converted into and out of one
60/// without allocations or copies.
61///
62/// The underlying buffer must be natively aligned to the primitive type they are representing.
63///
64/// Values are stored in their native representation with proper alignment.
65/// Null values still occupy space in the buffer but are marked invalid in the validity mask.
66///
67/// # Examples
68///
69/// ```
70/// # fn main() -> vortex_error::VortexResult<()> {
71/// use vortex_array::arrays::PrimitiveArray;
72/// use vortex_array::{VortexSessionExecute, array_session};
73///
74/// // Create from iterator using FromIterator impl
75/// let array: PrimitiveArray = [1i32, 2, 3, 4, 5].into_iter().collect();
76///
77/// // Slice the array
78/// let sliced = array.slice(1..3)?;
79///
80/// // Access individual values
81/// let mut ctx = array_session().create_execution_ctx();
82/// let value = sliced.execute_scalar(0, &mut ctx).unwrap();
83/// assert_eq!(value, 2i32.into());
84///
85/// # Ok(())
86/// # }
87/// ```
88#[derive(Clone, Debug)]
89pub struct PrimitiveData {
90    pub(super) ptype: PType,
91    pub(super) buffer: BufferHandle,
92}
93
94impl Display for PrimitiveData {
95    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
96        write!(f, "ptype: {}", self.ptype)
97    }
98}
99
100pub struct PrimitiveDataParts {
101    pub ptype: PType,
102    pub buffer: BufferHandle,
103    pub validity: Validity,
104}
105
106pub trait PrimitiveArrayExt: TypedArrayRef<Primitive> {
107    fn ptype(&self) -> PType {
108        match self.as_ref().dtype() {
109            DType::Primitive(ptype, _) => *ptype,
110            _ => unreachable!("PrimitiveArrayExt requires a primitive dtype"),
111        }
112    }
113
114    fn nullability(&self) -> Nullability {
115        match self.as_ref().dtype() {
116            DType::Primitive(_, nullability) => *nullability,
117            _ => unreachable!("PrimitiveArrayExt requires a primitive dtype"),
118        }
119    }
120
121    fn validity_child(&self) -> Option<&ArrayRef> {
122        self.as_ref().slots()[VALIDITY_SLOT].as_ref()
123    }
124
125    fn validity(&self) -> Validity {
126        child_to_validity(
127            self.as_ref().slots()[VALIDITY_SLOT].as_ref(),
128            self.nullability(),
129        )
130    }
131
132    fn buffer_handle(&self) -> &BufferHandle {
133        &self.buffer
134    }
135
136    fn reinterpret_cast(&self, ptype: PType) -> PrimitiveArray {
137        if self.ptype() == ptype {
138            return self.to_owned();
139        }
140
141        assert_eq!(
142            self.ptype().byte_width(),
143            ptype.byte_width(),
144            "can't reinterpret cast between integers of two different widths"
145        );
146
147        PrimitiveArray::from_buffer_handle(self.buffer_handle().clone(), ptype, self.validity())
148    }
149
150    /// Narrow the array to the smallest possible integer type that can represent all values.
151    fn narrow(&self, ctx: &mut ExecutionCtx) -> VortexResult<PrimitiveArray> {
152        if !self.ptype().is_int() {
153            return Ok(self.to_owned());
154        }
155
156        let Some(min_max) = min_max(self.as_ref(), ctx, NumericalAggregateOpts::default())? else {
157            return Ok(PrimitiveArray::new(
158                Buffer::<u8>::zeroed(self.len()),
159                self.validity(),
160            ));
161        };
162
163        // If we can't cast to i64, then leave the array as its original type.
164        // It's too big to downcast anyway.
165        let Ok(min) = min_max
166            .min
167            .cast(&PType::I64.into())
168            .and_then(|s| i64::try_from(&s))
169        else {
170            return Ok(self.to_owned());
171        };
172        let Ok(max) = min_max
173            .max
174            .cast(&PType::I64.into())
175            .and_then(|s| i64::try_from(&s))
176        else {
177            return Ok(self.to_owned());
178        };
179
180        let nullability = self.as_ref().dtype().nullability();
181
182        if min < 0 || max < 0 {
183            // Signed
184            if min >= i8::MIN as i64 && max <= i8::MAX as i64 {
185                let result = self
186                    .as_ref()
187                    .cast(DType::Primitive(PType::I8, nullability))?
188                    .execute::<PrimitiveArray>(ctx)?;
189                return Ok(result);
190            }
191
192            if min >= i16::MIN as i64 && max <= i16::MAX as i64 {
193                let result = self
194                    .as_ref()
195                    .cast(DType::Primitive(PType::I16, nullability))?
196                    .execute::<PrimitiveArray>(ctx)?;
197                return Ok(result);
198            }
199
200            if min >= i32::MIN as i64 && max <= i32::MAX as i64 {
201                let result = self
202                    .as_ref()
203                    .cast(DType::Primitive(PType::I32, nullability))?
204                    .execute::<PrimitiveArray>(ctx)?;
205                return Ok(result);
206            }
207        } else {
208            // Unsigned
209            if max <= u8::MAX as i64 {
210                let result = self
211                    .as_ref()
212                    .cast(DType::Primitive(PType::U8, nullability))?
213                    .execute::<PrimitiveArray>(ctx)?;
214                return Ok(result);
215            }
216
217            if max <= u16::MAX as i64 {
218                let result = self
219                    .as_ref()
220                    .cast(DType::Primitive(PType::U16, nullability))?
221                    .execute::<PrimitiveArray>(ctx)?;
222                return Ok(result);
223            }
224
225            if max <= u32::MAX as i64 {
226                let result = self
227                    .as_ref()
228                    .cast(DType::Primitive(PType::U32, nullability))?
229                    .execute::<PrimitiveArray>(ctx)?;
230                return Ok(result);
231            }
232        }
233
234        Ok(self.to_owned())
235    }
236}
237impl<T: TypedArrayRef<Primitive>> PrimitiveArrayExt for T {}
238
239// TODO(connor): There are a lot of places where we could be using `new_unchecked` in the codebase.
240impl PrimitiveData {
241    /// Build the slots vector for this array.
242    pub(super) fn make_slots(validity: &Validity, len: usize) -> ArraySlots {
243        smallvec![validity_to_child(validity, len)]
244    }
245
246    /// Create a new array from a buffer handle.
247    ///
248    /// # Safety
249    ///
250    /// Should ensure that the provided BufferHandle points at sufficiently large region of aligned
251    /// memory to hold the `ptype` values.
252    pub unsafe fn new_unchecked_from_handle(
253        handle: BufferHandle,
254        ptype: PType,
255        _validity: Validity,
256    ) -> Self {
257        Self {
258            ptype,
259            buffer: handle,
260        }
261    }
262
263    /// Creates a new `PrimitiveArray`.
264    ///
265    /// # Panics
266    ///
267    /// Panics if the provided components do not satisfy the invariants documented
268    /// in `PrimitiveArray::new_unchecked`.
269    pub fn new<T: NativePType>(buffer: impl Into<Buffer<T>>, validity: Validity) -> Self {
270        let buffer = buffer.into();
271        Self::try_new(buffer, validity).vortex_expect("PrimitiveArray construction failed")
272    }
273
274    /// Constructs a new `PrimitiveArray`.
275    ///
276    /// See `PrimitiveArray::new_unchecked` for more information.
277    ///
278    /// # Errors
279    ///
280    /// Returns an error if the provided components do not satisfy the invariants documented in
281    /// `PrimitiveArray::new_unchecked`.
282    #[inline]
283    pub fn try_new<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> VortexResult<Self> {
284        Self::validate(&buffer, &validity)?;
285
286        // SAFETY: validate ensures all invariants are met.
287        Ok(unsafe { Self::new_unchecked(buffer, validity) })
288    }
289
290    /// Creates a new `PrimitiveArray` without validation from these components:
291    ///
292    /// * `buffer` is a typed buffer containing the primitive values.
293    /// * `validity` holds the null values.
294    ///
295    /// # Safety
296    ///
297    /// The caller must ensure all of the following invariants are satisfied:
298    ///
299    /// ## Validity Requirements
300    ///
301    /// - If `validity` is [`Validity::Array`], its length must exactly equal `buffer.len()`.
302    #[inline]
303    pub unsafe fn new_unchecked<T: NativePType>(buffer: Buffer<T>, _validity: Validity) -> Self {
304        #[cfg(debug_assertions)]
305        Self::validate(&buffer, &_validity)
306            .vortex_expect("[Debug Assertion]: Invalid `PrimitiveArray` parameters");
307
308        Self {
309            ptype: T::PTYPE,
310            buffer: BufferHandle::new_host(buffer.into_byte_buffer()),
311        }
312    }
313
314    /// Validates the components that would be used to create a `PrimitiveArray`.
315    ///
316    /// This function checks all the invariants required by `PrimitiveArray::new_unchecked`.
317    #[inline]
318    pub fn validate<T: NativePType>(buffer: &Buffer<T>, validity: &Validity) -> VortexResult<()> {
319        if let Some(len) = validity.maybe_len()
320            && buffer.len() != len
321        {
322            return Err(vortex_err!(
323                InvalidArgument:
324                "Buffer and validity length mismatch: buffer={}, validity={}",
325                buffer.len(),
326                len
327            ));
328        }
329        Ok(())
330    }
331
332    pub fn empty<T: NativePType>(nullability: Nullability) -> Self {
333        Self::new(Buffer::<T>::empty(), nullability.into())
334    }
335}
336
337impl Array<Primitive> {
338    pub fn empty<T: NativePType>(nullability: Nullability) -> Self {
339        let dtype = DType::Primitive(T::PTYPE, nullability);
340        let len = 0;
341        let data = PrimitiveData::empty::<T>(nullability);
342        let slots = PrimitiveData::make_slots(&Validity::from(nullability), len);
343        unsafe {
344            Array::from_parts_unchecked(
345                ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
346            )
347        }
348    }
349
350    /// Creates a new `PrimitiveArray`.
351    ///
352    /// # Panics
353    ///
354    /// Panics if the provided components do not satisfy the invariants.
355    pub fn new<T: NativePType>(buffer: impl Into<Buffer<T>>, validity: Validity) -> Self {
356        let buffer = buffer.into();
357        let dtype = DType::Primitive(T::PTYPE, validity.nullability());
358        let len = buffer.len();
359        let slots = PrimitiveData::make_slots(&validity, len);
360        let data = PrimitiveData::new(buffer, validity);
361        unsafe {
362            Array::from_parts_unchecked(
363                ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
364            )
365        }
366    }
367
368    /// Constructs a new `PrimitiveArray`.
369    pub fn try_new<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> VortexResult<Self> {
370        let dtype = DType::Primitive(T::PTYPE, validity.nullability());
371        let len = buffer.len();
372        let slots = PrimitiveData::make_slots(&validity, len);
373        let data = PrimitiveData::try_new(buffer, validity)?;
374        Ok(unsafe {
375            Array::from_parts_unchecked(
376                ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
377            )
378        })
379    }
380
381    /// Creates a new `PrimitiveArray` without validation.
382    ///
383    /// # Safety
384    ///
385    /// See [`PrimitiveData::new_unchecked`].
386    pub unsafe fn new_unchecked<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> Self {
387        let dtype = DType::Primitive(T::PTYPE, validity.nullability());
388        let len = buffer.len();
389        let slots = PrimitiveData::make_slots(&validity, len);
390        let data = unsafe { PrimitiveData::new_unchecked(buffer, validity) };
391        unsafe {
392            Array::from_parts_unchecked(
393                ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
394            )
395        }
396    }
397
398    /// Create a new array from a buffer handle.
399    ///
400    /// # Safety
401    ///
402    /// See [`PrimitiveData::new_unchecked_from_handle`].
403    pub unsafe fn new_unchecked_from_handle(
404        handle: BufferHandle,
405        ptype: PType,
406        validity: Validity,
407    ) -> Self {
408        let dtype = DType::Primitive(ptype, validity.nullability());
409        let len = handle.len() / ptype.byte_width();
410        let slots = PrimitiveData::make_slots(&validity, len);
411        let data = unsafe { PrimitiveData::new_unchecked_from_handle(handle, ptype, validity) };
412        unsafe {
413            Array::from_parts_unchecked(
414                ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
415            )
416        }
417    }
418
419    /// Creates a new `PrimitiveArray` from a [`BufferHandle`].
420    pub fn from_buffer_handle(handle: BufferHandle, ptype: PType, validity: Validity) -> Self {
421        let dtype = DType::Primitive(ptype, validity.nullability());
422        let len = handle.len() / ptype.byte_width();
423        let slots = PrimitiveData::make_slots(&validity, len);
424        let data = PrimitiveData::from_buffer_handle(handle, ptype, validity);
425        Array::try_from_parts(ArrayParts::new(Primitive, dtype, len, data).with_slots(slots))
426            .vortex_expect("PrimitiveData is always valid")
427    }
428
429    /// Creates a new `PrimitiveArray` from a [`ByteBuffer`].
430    pub fn from_byte_buffer(buffer: ByteBuffer, ptype: PType, validity: Validity) -> Self {
431        let dtype = DType::Primitive(ptype, validity.nullability());
432        let len = buffer.len() / ptype.byte_width();
433        let slots = PrimitiveData::make_slots(&validity, len);
434        let data = PrimitiveData::from_byte_buffer(buffer, ptype, validity);
435        unsafe {
436            Array::from_parts_unchecked(
437                ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
438            )
439        }
440    }
441
442    /// Create a PrimitiveArray from a byte buffer containing only the valid elements.
443    pub fn from_values_byte_buffer(
444        valid_elems_buffer: ByteBuffer,
445        ptype: PType,
446        validity: Validity,
447        n_rows: usize,
448        ctx: &mut ExecutionCtx,
449    ) -> Self {
450        let dtype = DType::Primitive(ptype, validity.nullability());
451        let len = n_rows;
452        let slots = PrimitiveData::make_slots(&validity, len);
453        let data = PrimitiveData::from_values_byte_buffer(
454            valid_elems_buffer,
455            ptype,
456            validity,
457            n_rows,
458            ctx,
459        );
460        unsafe {
461            Array::from_parts_unchecked(
462                ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
463            )
464        }
465    }
466
467    /// Validates the components that would be used to create a `PrimitiveArray`.
468    pub fn validate<T: NativePType>(buffer: &Buffer<T>, validity: &Validity) -> VortexResult<()> {
469        PrimitiveData::validate(buffer, validity)
470    }
471
472    pub fn into_data_parts(self) -> PrimitiveDataParts {
473        let validity = PrimitiveArrayExt::validity(&self);
474        let ptype = PrimitiveArrayExt::ptype(&self);
475        let data = self.into_data();
476        PrimitiveDataParts {
477            ptype,
478            buffer: data.buffer,
479            validity,
480        }
481    }
482
483    pub fn map_each_with_validity<T, R, F>(self, ctx: &mut ExecutionCtx, f: F) -> VortexResult<Self>
484    where
485        T: NativePType,
486        R: NativePType,
487        F: FnMut((T, bool)) -> R,
488    {
489        let validity = PrimitiveArrayExt::validity(&self);
490        let data = self.into_data();
491        let buf_iter = data.to_buffer::<T>().into_iter();
492
493        let buffer = match &validity {
494            Validity::NonNullable | Validity::AllValid => {
495                Buffer::<R>::from_trusted_len_iter(buf_iter.zip(repeat(true)).map(f))
496            }
497            Validity::AllInvalid => {
498                Buffer::<R>::from_trusted_len_iter(buf_iter.zip(repeat(false)).map(f))
499            }
500            Validity::Array(val) => {
501                let val = val.clone().execute::<BoolArray>(ctx)?.into_bit_buffer();
502                Buffer::<R>::from_trusted_len_iter(buf_iter.zip(val.iter()).map(f))
503            }
504        };
505        Ok(PrimitiveArray::new(buffer, validity))
506    }
507}
508
509impl PrimitiveData {
510    pub fn len(&self) -> usize {
511        self.buffer.len() / self.ptype.byte_width()
512    }
513
514    /// Returns `true` if the array is empty.
515    pub fn is_empty(&self) -> bool {
516        self.buffer.is_empty()
517    }
518
519    pub fn ptype(&self) -> PType {
520        self.ptype
521    }
522
523    /// Get access to the buffer handle backing the array.
524    pub fn buffer_handle(&self) -> &BufferHandle {
525        &self.buffer
526    }
527
528    pub fn from_buffer_handle(handle: BufferHandle, ptype: PType, _validity: Validity) -> Self {
529        Self {
530            ptype,
531            buffer: handle,
532        }
533    }
534
535    pub fn from_byte_buffer(buffer: ByteBuffer, ptype: PType, validity: Validity) -> Self {
536        match_each_native_ptype!(ptype, |T| {
537            Self::new::<T>(Buffer::from_byte_buffer(buffer), validity)
538        })
539    }
540
541    /// Create a PrimitiveArray from a byte buffer containing only the valid elements.
542    pub fn from_values_byte_buffer(
543        valid_elems_buffer: ByteBuffer,
544        ptype: PType,
545        validity: Validity,
546        n_rows: usize,
547        ctx: &mut ExecutionCtx,
548    ) -> Self {
549        let byte_width = ptype.byte_width();
550        let alignment = Alignment::new(byte_width);
551        let buffer = match &validity {
552            Validity::AllValid | Validity::NonNullable => valid_elems_buffer.aligned(alignment),
553            Validity::AllInvalid => ByteBuffer::zeroed_aligned(n_rows * byte_width, alignment),
554            Validity::Array(is_valid) => {
555                let bool_array = is_valid
556                    .clone()
557                    .execute::<BoolArray>(ctx)
558                    .vortex_expect("must be a bool array");
559                let bool_buffer = bool_array.bit_buffer_view();
560                let mut bytes = ByteBufferMut::zeroed_aligned(n_rows * byte_width, alignment);
561                for (i, valid_i) in bool_buffer.set_indices().enumerate() {
562                    bytes[valid_i * byte_width..(valid_i + 1) * byte_width]
563                        .copy_from_slice(&valid_elems_buffer[i * byte_width..(i + 1) * byte_width])
564                }
565                bytes.freeze()
566            }
567        };
568
569        Self::from_byte_buffer(buffer, ptype, validity)
570    }
571
572    /// Get a buffer in host memory holding all the values.
573    ///
574    /// NOTE: some values may be nonsense if the validity buffer indicates that the value is null.
575    pub fn to_buffer<T: NativePType>(&self) -> Buffer<T> {
576        if T::PTYPE != self.ptype() {
577            vortex_panic!(
578                "Attempted to get buffer of type {} from array of type {}",
579                T::PTYPE,
580                self.ptype()
581            )
582        }
583        Buffer::from_byte_buffer(self.buffer_handle().to_host_sync())
584    }
585
586    /// Consume the array and get a host Buffer containing the data values.
587    pub fn into_buffer<T: NativePType>(self) -> Buffer<T> {
588        if T::PTYPE != self.ptype() {
589            vortex_panic!(
590                "Attempted to get buffer of type {} from array of type {}",
591                T::PTYPE,
592                self.ptype()
593            )
594        }
595        Buffer::from_byte_buffer(self.buffer.into_host_sync())
596    }
597
598    /// Extract a mutable buffer from the PrimitiveData. Attempts to do this with zero-copy
599    /// if the buffer is uniquely owned, otherwise will make a copy.
600    pub fn into_buffer_mut<T: NativePType>(self) -> BufferMut<T> {
601        self.try_into_buffer_mut()
602            .unwrap_or_else(|buffer| BufferMut::<T>::copy_from(&buffer))
603    }
604
605    /// Try to extract a mutable buffer from the PrimitiveData with zero copy.
606    ///
607    /// # Panic
608    /// If the buffer is not of type T this will panic
609    pub fn try_into_buffer_mut<T: NativePType>(self) -> Result<BufferMut<T>, Buffer<T>> {
610        if T::PTYPE != self.ptype() {
611            vortex_panic!(
612                "Attempted to get buffer_mut of type {} from array of type {}",
613                T::PTYPE,
614                self.ptype()
615            )
616        }
617        let buffer = Buffer::<T>::from_byte_buffer(self.buffer.into_host_sync());
618        buffer.try_into_mut()
619    }
620}