vortex_array/arrays/primitive/array/
mod.rs1use std::iter;
5
6use vortex_buffer::{Alignment, Buffer, BufferMut, ByteBuffer, ByteBufferMut};
7use vortex_dtype::{DType, NativePType, Nullability, PType, match_each_native_ptype};
8use vortex_error::{VortexExpect, VortexResult, vortex_err};
9
10use crate::ToCanonical;
11use crate::stats::ArrayStats;
12use crate::validity::Validity;
13use crate::vtable::ValidityHelper;
14
15mod accessor;
16mod cast;
17mod conversion;
18mod patch;
19mod top_value;
20
21#[derive(Clone, Debug)]
53pub struct PrimitiveArray {
54 pub(super) dtype: DType,
55 pub(super) buffer: ByteBuffer,
56 pub(super) validity: Validity,
57 pub(super) stats_set: ArrayStats,
58}
59
60impl PrimitiveArray {
62 pub fn new<T: NativePType>(buffer: impl Into<Buffer<T>>, validity: Validity) -> Self {
69 let buffer = buffer.into();
70 Self::try_new(buffer, validity).vortex_expect("PrimitiveArray construction failed")
71 }
72
73 #[inline]
82 pub fn try_new<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> VortexResult<Self> {
83 Self::validate(&buffer, &validity)?;
84
85 Ok(unsafe { Self::new_unchecked(buffer, validity) })
87 }
88
89 #[inline]
102 pub unsafe fn new_unchecked<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> Self {
103 #[cfg(debug_assertions)]
104 Self::validate(&buffer, &validity)
105 .vortex_expect("[Debug Assertion]: Invalid `PrimitiveArray` parameters");
106
107 Self {
108 dtype: DType::Primitive(T::PTYPE, validity.nullability()),
109 buffer: buffer.into_byte_buffer(),
110 validity,
111 stats_set: Default::default(),
112 }
113 }
114
115 #[inline]
119 pub fn validate<T: NativePType>(buffer: &Buffer<T>, validity: &Validity) -> VortexResult<()> {
120 if let Some(len) = validity.maybe_len()
121 && buffer.len() != len
122 {
123 return Err(vortex_err!(
124 "Buffer and validity length mismatch: buffer={}, validity={}",
125 buffer.len(),
126 len
127 ));
128 }
129 Ok(())
130 }
131
132 pub fn empty<T: NativePType>(nullability: Nullability) -> Self {
133 Self::new(Buffer::<T>::empty(), nullability.into())
134 }
135
136 pub fn ptype(&self) -> PType {
137 self.dtype().as_ptype()
138 }
139
140 pub fn byte_buffer(&self) -> &ByteBuffer {
141 &self.buffer
142 }
143
144 pub fn into_byte_buffer(self) -> ByteBuffer {
145 self.buffer
146 }
147
148 pub fn from_byte_buffer(buffer: ByteBuffer, ptype: PType, validity: Validity) -> Self {
149 match_each_native_ptype!(ptype, |T| {
150 Self::new::<T>(Buffer::from_byte_buffer(buffer), validity)
151 })
152 }
153
154 pub fn from_values_byte_buffer(
156 valid_elems_buffer: ByteBuffer,
157 ptype: PType,
158 validity: Validity,
159 n_rows: usize,
160 ) -> Self {
161 let byte_width = ptype.byte_width();
162 let alignment = Alignment::new(byte_width);
163 let buffer = match &validity {
164 Validity::AllValid | Validity::NonNullable => valid_elems_buffer.aligned(alignment),
165 Validity::AllInvalid => ByteBuffer::zeroed_aligned(n_rows * byte_width, alignment),
166 Validity::Array(is_valid) => {
167 let bool_array = is_valid.to_bool();
168 let bool_buffer = bool_array.boolean_buffer();
169 let mut bytes = ByteBufferMut::zeroed_aligned(n_rows * byte_width, alignment);
170 for (i, valid_i) in bool_buffer.set_indices().enumerate() {
171 bytes[valid_i * byte_width..(valid_i + 1) * byte_width]
172 .copy_from_slice(&valid_elems_buffer[i * byte_width..(i + 1) * byte_width])
173 }
174 bytes.freeze()
175 }
176 };
177
178 Self::from_byte_buffer(buffer, ptype, validity)
179 }
180
181 pub fn map_each<T, R, F>(self, f: F) -> PrimitiveArray
188 where
189 T: NativePType,
190 R: NativePType,
191 F: FnMut(T) -> R,
192 {
193 let validity = self.validity().clone();
194 let buffer = match self.try_into_buffer_mut() {
195 Ok(buffer_mut) => buffer_mut.map_each(f),
196 Err(parray) => BufferMut::<R>::from_iter(parray.buffer::<T>().iter().copied().map(f)),
197 };
198 PrimitiveArray::new(buffer.freeze(), validity)
199 }
200
201 pub fn map_each_with_validity<T, R, F>(self, f: F) -> VortexResult<PrimitiveArray>
206 where
207 T: NativePType,
208 R: NativePType,
209 F: FnMut((T, bool)) -> R,
210 {
211 let validity = self.validity();
212
213 let buf_iter = self.buffer::<T>().into_iter();
214
215 let buffer = match &validity {
216 Validity::NonNullable | Validity::AllValid => {
217 BufferMut::<R>::from_iter(buf_iter.zip(iter::repeat(true)).map(f))
218 }
219 Validity::AllInvalid => {
220 BufferMut::<R>::from_iter(buf_iter.zip(iter::repeat(false)).map(f))
221 }
222 Validity::Array(val) => {
223 let val = val.to_bool();
224 BufferMut::<R>::from_iter(buf_iter.zip(val.boolean_buffer()).map(f))
225 }
226 };
227 Ok(PrimitiveArray::new(buffer.freeze(), validity.clone()))
228 }
229}