1use std::fmt::Display;
5use std::fmt::Formatter;
6use std::iter;
7
8use vortex_buffer::Alignment;
9use vortex_buffer::Buffer;
10use vortex_buffer::BufferMut;
11use vortex_buffer::ByteBuffer;
12use vortex_buffer::ByteBufferMut;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_error::vortex_err;
16use vortex_error::vortex_panic;
17
18use crate::LEGACY_SESSION;
19use crate::ToCanonical;
20use crate::VortexSessionExecute;
21use crate::array::Array;
22use crate::array::ArrayParts;
23use crate::array::TypedArrayRef;
24use crate::arrays::Primitive;
25use crate::arrays::PrimitiveArray;
26use crate::dtype::DType;
27use crate::dtype::NativePType;
28use crate::dtype::Nullability;
29use crate::dtype::PType;
30use crate::match_each_native_ptype;
31use crate::validity::Validity;
32
33mod accessor;
34mod cast;
35mod conversion;
36mod patch;
37mod top_value;
38
39pub use patch::chunk_range;
40pub use patch::patch_chunk;
41
42use crate::ArrayRef;
43use crate::aggregate_fn::fns::min_max::min_max;
44use crate::array::child_to_validity;
45use crate::array::validity_to_child;
46use crate::arrays::bool::BoolArrayExt;
47use crate::buffer::BufferHandle;
48use crate::builtins::ArrayBuiltins;
49
50pub(super) const VALIDITY_SLOT: usize = 0;
52pub(super) const NUM_SLOTS: usize = 1;
53pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["validity"];
54
55#[derive(Clone, Debug)]
88pub struct PrimitiveData {
89 pub(super) ptype: PType,
90 pub(super) buffer: BufferHandle,
91}
92
93impl Display for PrimitiveData {
94 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
95 write!(f, "ptype: {}", self.ptype)
96 }
97}
98
99pub struct PrimitiveDataParts {
100 pub ptype: PType,
101 pub buffer: BufferHandle,
102 pub validity: Validity,
103}
104
105pub trait PrimitiveArrayExt: TypedArrayRef<Primitive> {
106 fn ptype(&self) -> PType {
107 match self.as_ref().dtype() {
108 DType::Primitive(ptype, _) => *ptype,
109 _ => unreachable!("PrimitiveArrayExt requires a primitive dtype"),
110 }
111 }
112
113 fn nullability(&self) -> Nullability {
114 match self.as_ref().dtype() {
115 DType::Primitive(_, nullability) => *nullability,
116 _ => unreachable!("PrimitiveArrayExt requires a primitive dtype"),
117 }
118 }
119
120 fn validity_child(&self) -> Option<&ArrayRef> {
121 self.as_ref().slots()[VALIDITY_SLOT].as_ref()
122 }
123
124 fn validity(&self) -> Validity {
125 child_to_validity(&self.as_ref().slots()[VALIDITY_SLOT], self.nullability())
126 }
127
128 fn buffer_handle(&self) -> &BufferHandle {
129 &self.buffer
130 }
131
132 fn reinterpret_cast(&self, ptype: PType) -> PrimitiveArray {
133 if self.ptype() == ptype {
134 return self.to_owned();
135 }
136
137 assert_eq!(
138 self.ptype().byte_width(),
139 ptype.byte_width(),
140 "can't reinterpret cast between integers of two different widths"
141 );
142
143 PrimitiveArray::from_buffer_handle(self.buffer_handle().clone(), ptype, self.validity())
144 }
145
146 fn narrow(&self) -> VortexResult<PrimitiveArray> {
148 if !self.ptype().is_int() {
149 return Ok(self.to_owned());
150 }
151
152 let mut ctx = LEGACY_SESSION.create_execution_ctx();
153 let Some(min_max) = min_max(self.as_ref(), &mut ctx)? else {
154 return Ok(PrimitiveArray::new(
155 Buffer::<u8>::zeroed(self.len()),
156 self.validity(),
157 ));
158 };
159
160 let Ok(min) = min_max
163 .min
164 .cast(&PType::I64.into())
165 .and_then(|s| i64::try_from(&s))
166 else {
167 return Ok(self.to_owned());
168 };
169 let Ok(max) = min_max
170 .max
171 .cast(&PType::I64.into())
172 .and_then(|s| i64::try_from(&s))
173 else {
174 return Ok(self.to_owned());
175 };
176
177 let nullability = self.as_ref().dtype().nullability();
178
179 if min < 0 || max < 0 {
180 if min >= i8::MIN as i64 && max <= i8::MAX as i64 {
182 return Ok(self
183 .as_ref()
184 .cast(DType::Primitive(PType::I8, nullability))?
185 .to_primitive());
186 }
187
188 if min >= i16::MIN as i64 && max <= i16::MAX as i64 {
189 return Ok(self
190 .as_ref()
191 .cast(DType::Primitive(PType::I16, nullability))?
192 .to_primitive());
193 }
194
195 if min >= i32::MIN as i64 && max <= i32::MAX as i64 {
196 return Ok(self
197 .as_ref()
198 .cast(DType::Primitive(PType::I32, nullability))?
199 .to_primitive());
200 }
201 } else {
202 if max <= u8::MAX as i64 {
204 return Ok(self
205 .as_ref()
206 .cast(DType::Primitive(PType::U8, nullability))?
207 .to_primitive());
208 }
209
210 if max <= u16::MAX as i64 {
211 return Ok(self
212 .as_ref()
213 .cast(DType::Primitive(PType::U16, nullability))?
214 .to_primitive());
215 }
216
217 if max <= u32::MAX as i64 {
218 return Ok(self
219 .as_ref()
220 .cast(DType::Primitive(PType::U32, nullability))?
221 .to_primitive());
222 }
223 }
224
225 Ok(self.to_owned())
226 }
227}
228impl<T: TypedArrayRef<Primitive>> PrimitiveArrayExt for T {}
229
230impl PrimitiveData {
232 pub(super) fn make_slots(validity: &Validity, len: usize) -> Vec<Option<ArrayRef>> {
234 vec![validity_to_child(validity, len)]
235 }
236
237 pub unsafe fn new_unchecked_from_handle(
244 handle: BufferHandle,
245 ptype: PType,
246 _validity: Validity,
247 ) -> Self {
248 Self {
249 ptype,
250 buffer: handle,
251 }
252 }
253
254 pub fn new<T: NativePType>(buffer: impl Into<Buffer<T>>, validity: Validity) -> Self {
261 let buffer = buffer.into();
262 Self::try_new(buffer, validity).vortex_expect("PrimitiveArray construction failed")
263 }
264
265 #[inline]
274 pub fn try_new<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> VortexResult<Self> {
275 Self::validate(&buffer, &validity)?;
276
277 Ok(unsafe { Self::new_unchecked(buffer, validity) })
279 }
280
281 #[inline]
294 pub unsafe fn new_unchecked<T: NativePType>(buffer: Buffer<T>, _validity: Validity) -> Self {
295 #[cfg(debug_assertions)]
296 Self::validate(&buffer, &_validity)
297 .vortex_expect("[Debug Assertion]: Invalid `PrimitiveArray` parameters");
298
299 Self {
300 ptype: T::PTYPE,
301 buffer: BufferHandle::new_host(buffer.into_byte_buffer()),
302 }
303 }
304
305 #[inline]
309 pub fn validate<T: NativePType>(buffer: &Buffer<T>, validity: &Validity) -> VortexResult<()> {
310 if let Some(len) = validity.maybe_len()
311 && buffer.len() != len
312 {
313 return Err(vortex_err!(
314 InvalidArgument:
315 "Buffer and validity length mismatch: buffer={}, validity={}",
316 buffer.len(),
317 len
318 ));
319 }
320 Ok(())
321 }
322
323 pub fn empty<T: NativePType>(nullability: Nullability) -> Self {
324 Self::new(Buffer::<T>::empty(), nullability.into())
325 }
326}
327
328impl Array<Primitive> {
329 pub fn empty<T: NativePType>(nullability: Nullability) -> Self {
330 let dtype = DType::Primitive(T::PTYPE, nullability);
331 let len = 0;
332 let data = PrimitiveData::empty::<T>(nullability);
333 let slots = PrimitiveData::make_slots(&Validity::from(nullability), len);
334 unsafe {
335 Array::from_parts_unchecked(
336 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
337 )
338 }
339 }
340
341 pub fn new<T: NativePType>(buffer: impl Into<Buffer<T>>, validity: Validity) -> Self {
347 let buffer = buffer.into();
348 let dtype = DType::Primitive(T::PTYPE, validity.nullability());
349 let len = buffer.len();
350 let slots = PrimitiveData::make_slots(&validity, len);
351 let data = PrimitiveData::new(buffer, validity);
352 unsafe {
353 Array::from_parts_unchecked(
354 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
355 )
356 }
357 }
358
359 pub fn try_new<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> VortexResult<Self> {
361 let dtype = DType::Primitive(T::PTYPE, validity.nullability());
362 let len = buffer.len();
363 let slots = PrimitiveData::make_slots(&validity, len);
364 let data = PrimitiveData::try_new(buffer, validity)?;
365 Ok(unsafe {
366 Array::from_parts_unchecked(
367 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
368 )
369 })
370 }
371
372 pub unsafe fn new_unchecked<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> Self {
378 let dtype = DType::Primitive(T::PTYPE, validity.nullability());
379 let len = buffer.len();
380 let slots = PrimitiveData::make_slots(&validity, len);
381 let data = unsafe { PrimitiveData::new_unchecked(buffer, validity) };
382 unsafe {
383 Array::from_parts_unchecked(
384 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
385 )
386 }
387 }
388
389 pub unsafe fn new_unchecked_from_handle(
395 handle: BufferHandle,
396 ptype: PType,
397 validity: Validity,
398 ) -> Self {
399 let dtype = DType::Primitive(ptype, validity.nullability());
400 let len = handle.len() / ptype.byte_width();
401 let slots = PrimitiveData::make_slots(&validity, len);
402 let data = unsafe { PrimitiveData::new_unchecked_from_handle(handle, ptype, validity) };
403 unsafe {
404 Array::from_parts_unchecked(
405 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
406 )
407 }
408 }
409
410 pub fn from_buffer_handle(handle: BufferHandle, ptype: PType, validity: Validity) -> Self {
412 let dtype = DType::Primitive(ptype, validity.nullability());
413 let len = handle.len() / ptype.byte_width();
414 let slots = PrimitiveData::make_slots(&validity, len);
415 let data = PrimitiveData::from_buffer_handle(handle, ptype, validity);
416 Array::try_from_parts(ArrayParts::new(Primitive, dtype, len, data).with_slots(slots))
417 .vortex_expect("PrimitiveData is always valid")
418 }
419
420 pub fn from_byte_buffer(buffer: ByteBuffer, ptype: PType, validity: Validity) -> Self {
422 let dtype = DType::Primitive(ptype, validity.nullability());
423 let len = buffer.len() / ptype.byte_width();
424 let slots = PrimitiveData::make_slots(&validity, len);
425 let data = PrimitiveData::from_byte_buffer(buffer, ptype, validity);
426 unsafe {
427 Array::from_parts_unchecked(
428 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
429 )
430 }
431 }
432
433 pub fn from_values_byte_buffer(
435 valid_elems_buffer: ByteBuffer,
436 ptype: PType,
437 validity: Validity,
438 n_rows: usize,
439 ) -> Self {
440 let dtype = DType::Primitive(ptype, validity.nullability());
441 let len = n_rows;
442 let slots = PrimitiveData::make_slots(&validity, len);
443 let data =
444 PrimitiveData::from_values_byte_buffer(valid_elems_buffer, ptype, validity, n_rows);
445 unsafe {
446 Array::from_parts_unchecked(
447 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
448 )
449 }
450 }
451
452 pub fn validate<T: NativePType>(buffer: &Buffer<T>, validity: &Validity) -> VortexResult<()> {
454 PrimitiveData::validate(buffer, validity)
455 }
456
457 pub fn into_data_parts(self) -> PrimitiveDataParts {
458 let validity = PrimitiveArrayExt::validity(&self);
459 let ptype = PrimitiveArrayExt::ptype(&self);
460 let data = self.into_data();
461 PrimitiveDataParts {
462 ptype,
463 buffer: data.buffer,
464 validity,
465 }
466 }
467
468 pub fn map_each_with_validity<T, R, F>(self, f: F) -> VortexResult<Self>
469 where
470 T: NativePType,
471 R: NativePType,
472 F: FnMut((T, bool)) -> R,
473 {
474 let validity = PrimitiveArrayExt::validity(&self);
475 let data = self.into_data();
476 let buf_iter = data.to_buffer::<T>().into_iter();
477
478 let buffer = match &validity {
479 Validity::NonNullable | Validity::AllValid => {
480 BufferMut::<R>::from_iter(buf_iter.zip(iter::repeat(true)).map(f))
481 }
482 Validity::AllInvalid => {
483 BufferMut::<R>::from_iter(buf_iter.zip(iter::repeat(false)).map(f))
484 }
485 Validity::Array(val) => {
486 let val = val.to_bool().into_bit_buffer();
487 BufferMut::<R>::from_iter(buf_iter.zip(val.iter()).map(f))
488 }
489 };
490 Ok(PrimitiveArray::new(buffer.freeze(), validity))
491 }
492}
493
494impl PrimitiveData {
495 pub fn len(&self) -> usize {
496 self.buffer.len() / self.ptype.byte_width()
497 }
498
499 pub fn is_empty(&self) -> bool {
501 self.buffer.is_empty()
502 }
503
504 pub fn ptype(&self) -> PType {
505 self.ptype
506 }
507
508 pub fn buffer_handle(&self) -> &BufferHandle {
510 &self.buffer
511 }
512
513 pub fn from_buffer_handle(handle: BufferHandle, ptype: PType, _validity: Validity) -> Self {
514 Self {
515 ptype,
516 buffer: handle,
517 }
518 }
519
520 pub fn from_byte_buffer(buffer: ByteBuffer, ptype: PType, validity: Validity) -> Self {
521 match_each_native_ptype!(ptype, |T| {
522 Self::new::<T>(Buffer::from_byte_buffer(buffer), validity)
523 })
524 }
525
526 pub fn from_values_byte_buffer(
528 valid_elems_buffer: ByteBuffer,
529 ptype: PType,
530 validity: Validity,
531 n_rows: usize,
532 ) -> Self {
533 let byte_width = ptype.byte_width();
534 let alignment = Alignment::new(byte_width);
535 let buffer = match &validity {
536 Validity::AllValid | Validity::NonNullable => valid_elems_buffer.aligned(alignment),
537 Validity::AllInvalid => ByteBuffer::zeroed_aligned(n_rows * byte_width, alignment),
538 Validity::Array(is_valid) => {
539 let bool_array = is_valid.to_bool();
540 let bool_buffer = bool_array.to_bit_buffer();
541 let mut bytes = ByteBufferMut::zeroed_aligned(n_rows * byte_width, alignment);
542 for (i, valid_i) in bool_buffer.set_indices().enumerate() {
543 bytes[valid_i * byte_width..(valid_i + 1) * byte_width]
544 .copy_from_slice(&valid_elems_buffer[i * byte_width..(i + 1) * byte_width])
545 }
546 bytes.freeze()
547 }
548 };
549
550 Self::from_byte_buffer(buffer, ptype, validity)
551 }
552
553 pub fn to_buffer<T: NativePType>(&self) -> Buffer<T> {
557 if T::PTYPE != self.ptype() {
558 vortex_panic!(
559 "Attempted to get buffer of type {} from array of type {}",
560 T::PTYPE,
561 self.ptype()
562 )
563 }
564 Buffer::from_byte_buffer(self.buffer_handle().to_host_sync())
565 }
566
567 pub fn into_buffer<T: NativePType>(self) -> Buffer<T> {
569 if T::PTYPE != self.ptype() {
570 vortex_panic!(
571 "Attempted to get buffer of type {} from array of type {}",
572 T::PTYPE,
573 self.ptype()
574 )
575 }
576 Buffer::from_byte_buffer(self.buffer.into_host_sync())
577 }
578
579 pub fn into_buffer_mut<T: NativePType>(self) -> BufferMut<T> {
582 self.try_into_buffer_mut()
583 .unwrap_or_else(|buffer| BufferMut::<T>::copy_from(&buffer))
584 }
585
586 pub fn try_into_buffer_mut<T: NativePType>(self) -> Result<BufferMut<T>, Buffer<T>> {
588 if T::PTYPE != self.ptype() {
589 vortex_panic!(
590 "Attempted to get buffer_mut of type {} from array of type {}",
591 T::PTYPE,
592 self.ptype()
593 )
594 }
595 let buffer = Buffer::<T>::from_byte_buffer(self.buffer.into_host_sync());
596 buffer.try_into_mut()
597 }
598}