vortex_array/arrays/primitive/array/
mod.rs1use std::iter;
5
6use vortex_buffer::{Alignment, Buffer, BufferMut, ByteBuffer, ByteBufferMut};
7use vortex_dtype::{DType, NativePType, Nullability, PType, match_each_native_ptype};
8use vortex_error::{VortexExpect, VortexResult, vortex_err};
9
10use crate::ToCanonical;
11use crate::stats::ArrayStats;
12use crate::validity::Validity;
13use crate::vtable::ValidityHelper;
14
15mod accessor;
16mod cast;
17mod conversion;
18mod patch;
19mod top_value;
20
21pub use patch::patch_chunk;
22
23#[derive(Clone, Debug)]
55pub struct PrimitiveArray {
56 pub(super) dtype: DType,
57 pub(super) buffer: ByteBuffer,
58 pub(super) validity: Validity,
59 pub(super) stats_set: ArrayStats,
60}
61
62impl PrimitiveArray {
64 pub fn new<T: NativePType>(buffer: impl Into<Buffer<T>>, validity: Validity) -> Self {
71 let buffer = buffer.into();
72 Self::try_new(buffer, validity).vortex_expect("PrimitiveArray construction failed")
73 }
74
75 #[inline]
84 pub fn try_new<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> VortexResult<Self> {
85 Self::validate(&buffer, &validity)?;
86
87 Ok(unsafe { Self::new_unchecked(buffer, validity) })
89 }
90
91 #[inline]
104 pub unsafe fn new_unchecked<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> Self {
105 #[cfg(debug_assertions)]
106 Self::validate(&buffer, &validity)
107 .vortex_expect("[Debug Assertion]: Invalid `PrimitiveArray` parameters");
108
109 Self {
110 dtype: DType::Primitive(T::PTYPE, validity.nullability()),
111 buffer: buffer.into_byte_buffer(),
112 validity,
113 stats_set: Default::default(),
114 }
115 }
116
117 #[inline]
121 pub fn validate<T: NativePType>(buffer: &Buffer<T>, validity: &Validity) -> VortexResult<()> {
122 if let Some(len) = validity.maybe_len()
123 && buffer.len() != len
124 {
125 return Err(vortex_err!(
126 "Buffer and validity length mismatch: buffer={}, validity={}",
127 buffer.len(),
128 len
129 ));
130 }
131 Ok(())
132 }
133
134 pub fn empty<T: NativePType>(nullability: Nullability) -> Self {
135 Self::new(Buffer::<T>::empty(), nullability.into())
136 }
137
138 pub fn ptype(&self) -> PType {
139 self.dtype().as_ptype()
140 }
141
142 pub fn byte_buffer(&self) -> &ByteBuffer {
143 &self.buffer
144 }
145
146 pub fn into_byte_buffer(self) -> ByteBuffer {
147 self.buffer
148 }
149
150 pub fn from_byte_buffer(buffer: ByteBuffer, ptype: PType, validity: Validity) -> Self {
151 match_each_native_ptype!(ptype, |T| {
152 Self::new::<T>(Buffer::from_byte_buffer(buffer), validity)
153 })
154 }
155
156 pub fn from_values_byte_buffer(
158 valid_elems_buffer: ByteBuffer,
159 ptype: PType,
160 validity: Validity,
161 n_rows: usize,
162 ) -> Self {
163 let byte_width = ptype.byte_width();
164 let alignment = Alignment::new(byte_width);
165 let buffer = match &validity {
166 Validity::AllValid | Validity::NonNullable => valid_elems_buffer.aligned(alignment),
167 Validity::AllInvalid => ByteBuffer::zeroed_aligned(n_rows * byte_width, alignment),
168 Validity::Array(is_valid) => {
169 let bool_array = is_valid.to_bool();
170 let bool_buffer = bool_array.bit_buffer();
171 let mut bytes = ByteBufferMut::zeroed_aligned(n_rows * byte_width, alignment);
172 for (i, valid_i) in bool_buffer.set_indices().enumerate() {
173 bytes[valid_i * byte_width..(valid_i + 1) * byte_width]
174 .copy_from_slice(&valid_elems_buffer[i * byte_width..(i + 1) * byte_width])
175 }
176 bytes.freeze()
177 }
178 };
179
180 Self::from_byte_buffer(buffer, ptype, validity)
181 }
182
183 pub fn map_each<T, R, F>(self, f: F) -> PrimitiveArray
190 where
191 T: NativePType,
192 R: NativePType,
193 F: FnMut(T) -> R,
194 {
195 let validity = self.validity().clone();
196 let buffer = match self.try_into_buffer_mut() {
197 Ok(buffer_mut) => buffer_mut.map_each_in_place(f),
198 Err(parray) => BufferMut::<R>::from_iter(parray.buffer::<T>().iter().copied().map(f)),
199 };
200 PrimitiveArray::new(buffer.freeze(), validity)
201 }
202
203 pub fn map_each_with_validity<T, R, F>(self, f: F) -> VortexResult<PrimitiveArray>
208 where
209 T: NativePType,
210 R: NativePType,
211 F: FnMut((T, bool)) -> R,
212 {
213 let validity = self.validity();
214
215 let buf_iter = self.buffer::<T>().into_iter();
216
217 let buffer = match &validity {
218 Validity::NonNullable | Validity::AllValid => {
219 BufferMut::<R>::from_iter(buf_iter.zip(iter::repeat(true)).map(f))
220 }
221 Validity::AllInvalid => {
222 BufferMut::<R>::from_iter(buf_iter.zip(iter::repeat(false)).map(f))
223 }
224 Validity::Array(val) => {
225 let val = val.to_bool();
226 BufferMut::<R>::from_iter(buf_iter.zip(val.bit_buffer()).map(f))
227 }
228 };
229 Ok(PrimitiveArray::new(buffer.freeze(), validity.clone()))
230 }
231}