1use std::fmt::Display;
5use std::fmt::Formatter;
6use std::iter;
7
8use vortex_buffer::Alignment;
9use vortex_buffer::Buffer;
10use vortex_buffer::BufferMut;
11use vortex_buffer::ByteBuffer;
12use vortex_buffer::ByteBufferMut;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_error::vortex_err;
16use vortex_error::vortex_panic;
17
18use crate::LEGACY_SESSION;
19use crate::ToCanonical;
20use crate::VortexSessionExecute;
21use crate::array::Array;
22use crate::array::ArrayParts;
23use crate::array::TypedArrayRef;
24use crate::arrays::Primitive;
25use crate::arrays::PrimitiveArray;
26use crate::dtype::DType;
27use crate::dtype::NativePType;
28use crate::dtype::Nullability;
29use crate::dtype::PType;
30use crate::match_each_native_ptype;
31use crate::validity::Validity;
32
33mod accessor;
34mod cast;
35mod conversion;
36mod patch;
37mod top_value;
38
39pub use patch::chunk_range;
40pub use patch::patch_chunk;
41
42use crate::ArrayRef;
43use crate::aggregate_fn::fns::min_max::min_max;
44use crate::array::child_to_validity;
45use crate::array::validity_to_child;
46use crate::arrays::bool::BoolArrayExt;
47use crate::buffer::BufferHandle;
48use crate::builtins::ArrayBuiltins;
49
50pub(super) const VALIDITY_SLOT: usize = 0;
52pub(super) const NUM_SLOTS: usize = 1;
53pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["validity"];
54
55#[derive(Clone, Debug)]
86pub struct PrimitiveData {
87 pub(super) ptype: PType,
88 pub(super) buffer: BufferHandle,
89}
90
91impl Display for PrimitiveData {
92 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
93 write!(f, "ptype: {}", self.ptype)
94 }
95}
96
97pub struct PrimitiveDataParts {
98 pub ptype: PType,
99 pub buffer: BufferHandle,
100 pub validity: Validity,
101}
102
103pub trait PrimitiveArrayExt: TypedArrayRef<Primitive> {
104 fn ptype(&self) -> PType {
105 match self.as_ref().dtype() {
106 DType::Primitive(ptype, _) => *ptype,
107 _ => unreachable!("PrimitiveArrayExt requires a primitive dtype"),
108 }
109 }
110
111 fn nullability(&self) -> Nullability {
112 match self.as_ref().dtype() {
113 DType::Primitive(_, nullability) => *nullability,
114 _ => unreachable!("PrimitiveArrayExt requires a primitive dtype"),
115 }
116 }
117
118 fn validity_child(&self) -> Option<&ArrayRef> {
119 self.as_ref().slots()[VALIDITY_SLOT].as_ref()
120 }
121
122 fn validity(&self) -> Validity {
123 child_to_validity(&self.as_ref().slots()[VALIDITY_SLOT], self.nullability())
124 }
125
126 fn validity_mask(&self) -> vortex_mask::Mask {
127 self.validity().to_mask(self.as_ref().len())
128 }
129
130 fn buffer_handle(&self) -> &BufferHandle {
131 &self.buffer
132 }
133
134 fn reinterpret_cast(&self, ptype: PType) -> PrimitiveArray {
135 if self.ptype() == ptype {
136 return self.to_owned();
137 }
138
139 assert_eq!(
140 self.ptype().byte_width(),
141 ptype.byte_width(),
142 "can't reinterpret cast between integers of two different widths"
143 );
144
145 PrimitiveArray::from_buffer_handle(self.buffer_handle().clone(), ptype, self.validity())
146 }
147
148 fn narrow(&self) -> VortexResult<PrimitiveArray> {
150 if !self.ptype().is_int() {
151 return Ok(self.to_owned());
152 }
153
154 let mut ctx = LEGACY_SESSION.create_execution_ctx();
155 let Some(min_max) = min_max(self.as_ref(), &mut ctx)? else {
156 return Ok(PrimitiveArray::new(
157 Buffer::<u8>::zeroed(self.len()),
158 self.validity(),
159 ));
160 };
161
162 let Ok(min) = min_max
165 .min
166 .cast(&PType::I64.into())
167 .and_then(|s| i64::try_from(&s))
168 else {
169 return Ok(self.to_owned());
170 };
171 let Ok(max) = min_max
172 .max
173 .cast(&PType::I64.into())
174 .and_then(|s| i64::try_from(&s))
175 else {
176 return Ok(self.to_owned());
177 };
178
179 let nullability = self.as_ref().dtype().nullability();
180
181 if min < 0 || max < 0 {
182 if min >= i8::MIN as i64 && max <= i8::MAX as i64 {
184 return Ok(self
185 .as_ref()
186 .cast(DType::Primitive(PType::I8, nullability))?
187 .to_primitive());
188 }
189
190 if min >= i16::MIN as i64 && max <= i16::MAX as i64 {
191 return Ok(self
192 .as_ref()
193 .cast(DType::Primitive(PType::I16, nullability))?
194 .to_primitive());
195 }
196
197 if min >= i32::MIN as i64 && max <= i32::MAX as i64 {
198 return Ok(self
199 .as_ref()
200 .cast(DType::Primitive(PType::I32, nullability))?
201 .to_primitive());
202 }
203 } else {
204 if max <= u8::MAX as i64 {
206 return Ok(self
207 .as_ref()
208 .cast(DType::Primitive(PType::U8, nullability))?
209 .to_primitive());
210 }
211
212 if max <= u16::MAX as i64 {
213 return Ok(self
214 .as_ref()
215 .cast(DType::Primitive(PType::U16, nullability))?
216 .to_primitive());
217 }
218
219 if max <= u32::MAX as i64 {
220 return Ok(self
221 .as_ref()
222 .cast(DType::Primitive(PType::U32, nullability))?
223 .to_primitive());
224 }
225 }
226
227 Ok(self.to_owned())
228 }
229}
230impl<T: TypedArrayRef<Primitive>> PrimitiveArrayExt for T {}
231
232impl PrimitiveData {
234 pub(super) fn make_slots(validity: &Validity, len: usize) -> Vec<Option<ArrayRef>> {
236 vec![validity_to_child(validity, len)]
237 }
238
239 pub unsafe fn new_unchecked_from_handle(
246 handle: BufferHandle,
247 ptype: PType,
248 _validity: Validity,
249 ) -> Self {
250 Self {
251 ptype,
252 buffer: handle,
253 }
254 }
255
256 pub fn new<T: NativePType>(buffer: impl Into<Buffer<T>>, validity: Validity) -> Self {
263 let buffer = buffer.into();
264 Self::try_new(buffer, validity).vortex_expect("PrimitiveArray construction failed")
265 }
266
267 #[inline]
276 pub fn try_new<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> VortexResult<Self> {
277 Self::validate(&buffer, &validity)?;
278
279 Ok(unsafe { Self::new_unchecked(buffer, validity) })
281 }
282
283 #[inline]
296 pub unsafe fn new_unchecked<T: NativePType>(buffer: Buffer<T>, _validity: Validity) -> Self {
297 #[cfg(debug_assertions)]
298 Self::validate(&buffer, &_validity)
299 .vortex_expect("[Debug Assertion]: Invalid `PrimitiveArray` parameters");
300
301 Self {
302 ptype: T::PTYPE,
303 buffer: BufferHandle::new_host(buffer.into_byte_buffer()),
304 }
305 }
306
307 #[inline]
311 pub fn validate<T: NativePType>(buffer: &Buffer<T>, validity: &Validity) -> VortexResult<()> {
312 if let Some(len) = validity.maybe_len()
313 && buffer.len() != len
314 {
315 return Err(vortex_err!(
316 InvalidArgument:
317 "Buffer and validity length mismatch: buffer={}, validity={}",
318 buffer.len(),
319 len
320 ));
321 }
322 Ok(())
323 }
324
325 pub fn empty<T: NativePType>(nullability: Nullability) -> Self {
326 Self::new(Buffer::<T>::empty(), nullability.into())
327 }
328}
329
330impl Array<Primitive> {
331 pub fn empty<T: NativePType>(nullability: Nullability) -> Self {
332 let dtype = DType::Primitive(T::PTYPE, nullability);
333 let len = 0;
334 let data = PrimitiveData::empty::<T>(nullability);
335 let slots = PrimitiveData::make_slots(&Validity::from(nullability), len);
336 unsafe {
337 Array::from_parts_unchecked(
338 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
339 )
340 }
341 }
342
343 pub fn new<T: NativePType>(buffer: impl Into<Buffer<T>>, validity: Validity) -> Self {
349 let buffer = buffer.into();
350 let dtype = DType::Primitive(T::PTYPE, validity.nullability());
351 let len = buffer.len();
352 let slots = PrimitiveData::make_slots(&validity, len);
353 let data = PrimitiveData::new(buffer, validity);
354 unsafe {
355 Array::from_parts_unchecked(
356 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
357 )
358 }
359 }
360
361 pub fn try_new<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> VortexResult<Self> {
363 let dtype = DType::Primitive(T::PTYPE, validity.nullability());
364 let len = buffer.len();
365 let slots = PrimitiveData::make_slots(&validity, len);
366 let data = PrimitiveData::try_new(buffer, validity)?;
367 Ok(unsafe {
368 Array::from_parts_unchecked(
369 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
370 )
371 })
372 }
373
374 pub unsafe fn new_unchecked<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> Self {
380 let dtype = DType::Primitive(T::PTYPE, validity.nullability());
381 let len = buffer.len();
382 let slots = PrimitiveData::make_slots(&validity, len);
383 let data = unsafe { PrimitiveData::new_unchecked(buffer, validity) };
384 unsafe {
385 Array::from_parts_unchecked(
386 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
387 )
388 }
389 }
390
391 pub unsafe fn new_unchecked_from_handle(
397 handle: BufferHandle,
398 ptype: PType,
399 validity: Validity,
400 ) -> Self {
401 let dtype = DType::Primitive(ptype, validity.nullability());
402 let len = handle.len() / ptype.byte_width();
403 let slots = PrimitiveData::make_slots(&validity, len);
404 let data = unsafe { PrimitiveData::new_unchecked_from_handle(handle, ptype, validity) };
405 unsafe {
406 Array::from_parts_unchecked(
407 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
408 )
409 }
410 }
411
412 pub fn from_buffer_handle(handle: BufferHandle, ptype: PType, validity: Validity) -> Self {
414 let dtype = DType::Primitive(ptype, validity.nullability());
415 let len = handle.len() / ptype.byte_width();
416 let slots = PrimitiveData::make_slots(&validity, len);
417 let data = PrimitiveData::from_buffer_handle(handle, ptype, validity);
418 Array::try_from_parts(ArrayParts::new(Primitive, dtype, len, data).with_slots(slots))
419 .vortex_expect("PrimitiveData is always valid")
420 }
421
422 pub fn from_byte_buffer(buffer: ByteBuffer, ptype: PType, validity: Validity) -> Self {
424 let dtype = DType::Primitive(ptype, validity.nullability());
425 let len = buffer.len() / ptype.byte_width();
426 let slots = PrimitiveData::make_slots(&validity, len);
427 let data = PrimitiveData::from_byte_buffer(buffer, ptype, validity);
428 unsafe {
429 Array::from_parts_unchecked(
430 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
431 )
432 }
433 }
434
435 pub fn from_values_byte_buffer(
437 valid_elems_buffer: ByteBuffer,
438 ptype: PType,
439 validity: Validity,
440 n_rows: usize,
441 ) -> Self {
442 let dtype = DType::Primitive(ptype, validity.nullability());
443 let len = n_rows;
444 let slots = PrimitiveData::make_slots(&validity, len);
445 let data =
446 PrimitiveData::from_values_byte_buffer(valid_elems_buffer, ptype, validity, n_rows);
447 unsafe {
448 Array::from_parts_unchecked(
449 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
450 )
451 }
452 }
453
454 pub fn validate<T: NativePType>(buffer: &Buffer<T>, validity: &Validity) -> VortexResult<()> {
456 PrimitiveData::validate(buffer, validity)
457 }
458
459 pub fn into_data_parts(self) -> PrimitiveDataParts {
460 let validity = PrimitiveArrayExt::validity(&self);
461 let ptype = PrimitiveArrayExt::ptype(&self);
462 let data = self.into_data();
463 PrimitiveDataParts {
464 ptype,
465 buffer: data.buffer,
466 validity,
467 }
468 }
469
470 pub fn map_each_with_validity<T, R, F>(self, f: F) -> VortexResult<Self>
471 where
472 T: NativePType,
473 R: NativePType,
474 F: FnMut((T, bool)) -> R,
475 {
476 let validity = PrimitiveArrayExt::validity(&self);
477 let data = self.into_data();
478 let buf_iter = data.to_buffer::<T>().into_iter();
479
480 let buffer = match &validity {
481 Validity::NonNullable | Validity::AllValid => {
482 BufferMut::<R>::from_iter(buf_iter.zip(iter::repeat(true)).map(f))
483 }
484 Validity::AllInvalid => {
485 BufferMut::<R>::from_iter(buf_iter.zip(iter::repeat(false)).map(f))
486 }
487 Validity::Array(val) => {
488 let val = val.to_bool().into_bit_buffer();
489 BufferMut::<R>::from_iter(buf_iter.zip(val.iter()).map(f))
490 }
491 };
492 Ok(PrimitiveArray::new(buffer.freeze(), validity))
493 }
494}
495
496impl PrimitiveData {
497 pub fn len(&self) -> usize {
498 self.buffer.len() / self.ptype.byte_width()
499 }
500
501 pub fn is_empty(&self) -> bool {
503 self.buffer.is_empty()
504 }
505
506 pub fn ptype(&self) -> PType {
507 self.ptype
508 }
509
510 pub fn buffer_handle(&self) -> &BufferHandle {
512 &self.buffer
513 }
514
515 pub fn from_buffer_handle(handle: BufferHandle, ptype: PType, _validity: Validity) -> Self {
516 Self {
517 ptype,
518 buffer: handle,
519 }
520 }
521
522 pub fn from_byte_buffer(buffer: ByteBuffer, ptype: PType, validity: Validity) -> Self {
523 match_each_native_ptype!(ptype, |T| {
524 Self::new::<T>(Buffer::from_byte_buffer(buffer), validity)
525 })
526 }
527
528 pub fn from_values_byte_buffer(
530 valid_elems_buffer: ByteBuffer,
531 ptype: PType,
532 validity: Validity,
533 n_rows: usize,
534 ) -> Self {
535 let byte_width = ptype.byte_width();
536 let alignment = Alignment::new(byte_width);
537 let buffer = match &validity {
538 Validity::AllValid | Validity::NonNullable => valid_elems_buffer.aligned(alignment),
539 Validity::AllInvalid => ByteBuffer::zeroed_aligned(n_rows * byte_width, alignment),
540 Validity::Array(is_valid) => {
541 let bool_array = is_valid.to_bool();
542 let bool_buffer = bool_array.to_bit_buffer();
543 let mut bytes = ByteBufferMut::zeroed_aligned(n_rows * byte_width, alignment);
544 for (i, valid_i) in bool_buffer.set_indices().enumerate() {
545 bytes[valid_i * byte_width..(valid_i + 1) * byte_width]
546 .copy_from_slice(&valid_elems_buffer[i * byte_width..(i + 1) * byte_width])
547 }
548 bytes.freeze()
549 }
550 };
551
552 Self::from_byte_buffer(buffer, ptype, validity)
553 }
554
555 pub fn to_buffer<T: NativePType>(&self) -> Buffer<T> {
559 if T::PTYPE != self.ptype() {
560 vortex_panic!(
561 "Attempted to get buffer of type {} from array of type {}",
562 T::PTYPE,
563 self.ptype()
564 )
565 }
566 Buffer::from_byte_buffer(self.buffer_handle().to_host_sync())
567 }
568
569 pub fn into_buffer<T: NativePType>(self) -> Buffer<T> {
571 if T::PTYPE != self.ptype() {
572 vortex_panic!(
573 "Attempted to get buffer of type {} from array of type {}",
574 T::PTYPE,
575 self.ptype()
576 )
577 }
578 Buffer::from_byte_buffer(self.buffer.into_host_sync())
579 }
580
581 pub fn into_buffer_mut<T: NativePType>(self) -> BufferMut<T> {
584 self.try_into_buffer_mut()
585 .unwrap_or_else(|buffer| BufferMut::<T>::copy_from(&buffer))
586 }
587
588 pub fn try_into_buffer_mut<T: NativePType>(self) -> Result<BufferMut<T>, Buffer<T>> {
590 if T::PTYPE != self.ptype() {
591 vortex_panic!(
592 "Attempted to get buffer_mut of type {} from array of type {}",
593 T::PTYPE,
594 self.ptype()
595 )
596 }
597 let buffer = Buffer::<T>::from_byte_buffer(self.buffer.into_host_sync());
598 buffer.try_into_mut()
599 }
600}