1use std::fmt::Display;
5use std::fmt::Formatter;
6use std::iter::repeat;
7
8use smallvec::smallvec;
9use vortex_buffer::Alignment;
10use vortex_buffer::Buffer;
11use vortex_buffer::BufferMut;
12use vortex_buffer::ByteBuffer;
13use vortex_buffer::ByteBufferMut;
14use vortex_error::VortexExpect;
15use vortex_error::VortexResult;
16use vortex_error::vortex_err;
17use vortex_error::vortex_panic;
18
19use crate::ArraySlots;
20use crate::ExecutionCtx;
21use crate::array::Array;
22use crate::array::ArrayParts;
23use crate::array::TypedArrayRef;
24use crate::arrays::BoolArray;
25use crate::arrays::Primitive;
26use crate::arrays::PrimitiveArray;
27use crate::dtype::DType;
28use crate::dtype::NativePType;
29use crate::dtype::Nullability;
30use crate::dtype::PType;
31use crate::match_each_native_ptype;
32use crate::validity::Validity;
33
34mod cast;
35mod conversion;
36mod patch;
37mod top_value;
38
39pub use patch::chunk_range;
40pub use patch::patch_chunk;
41
42use crate::ArrayRef;
43use crate::aggregate_fn::NumericalAggregateOpts;
44use crate::aggregate_fn::fns::min_max::min_max;
45use crate::array::child_to_validity;
46use crate::array::validity_to_child;
47use crate::arrays::bool::BoolArrayExt;
48use crate::buffer::BufferHandle;
49use crate::builtins::ArrayBuiltins;
50
51pub(super) const VALIDITY_SLOT: usize = 0;
53pub(super) const NUM_SLOTS: usize = 1;
54pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["validity"];
55
56#[derive(Clone, Debug)]
89pub struct PrimitiveData {
90 pub(super) ptype: PType,
91 pub(super) buffer: BufferHandle,
92}
93
94impl Display for PrimitiveData {
95 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
96 write!(f, "ptype: {}", self.ptype)
97 }
98}
99
100pub struct PrimitiveDataParts {
101 pub ptype: PType,
102 pub buffer: BufferHandle,
103 pub validity: Validity,
104}
105
106pub trait PrimitiveArrayExt: TypedArrayRef<Primitive> {
107 fn ptype(&self) -> PType {
108 match self.as_ref().dtype() {
109 DType::Primitive(ptype, _) => *ptype,
110 _ => unreachable!("PrimitiveArrayExt requires a primitive dtype"),
111 }
112 }
113
114 fn nullability(&self) -> Nullability {
115 match self.as_ref().dtype() {
116 DType::Primitive(_, nullability) => *nullability,
117 _ => unreachable!("PrimitiveArrayExt requires a primitive dtype"),
118 }
119 }
120
121 fn validity_child(&self) -> Option<&ArrayRef> {
122 self.as_ref().slots()[VALIDITY_SLOT].as_ref()
123 }
124
125 fn validity(&self) -> Validity {
126 child_to_validity(
127 self.as_ref().slots()[VALIDITY_SLOT].as_ref(),
128 self.nullability(),
129 )
130 }
131
132 fn buffer_handle(&self) -> &BufferHandle {
133 &self.buffer
134 }
135
136 fn reinterpret_cast(&self, ptype: PType) -> PrimitiveArray {
137 if self.ptype() == ptype {
138 return self.to_owned();
139 }
140
141 assert_eq!(
142 self.ptype().byte_width(),
143 ptype.byte_width(),
144 "can't reinterpret cast between integers of two different widths"
145 );
146
147 PrimitiveArray::from_buffer_handle(self.buffer_handle().clone(), ptype, self.validity())
148 }
149
150 fn narrow(&self, ctx: &mut ExecutionCtx) -> VortexResult<PrimitiveArray> {
152 if !self.ptype().is_int() {
153 return Ok(self.to_owned());
154 }
155
156 let Some(min_max) = min_max(self.as_ref(), ctx, NumericalAggregateOpts::default())? else {
157 return Ok(PrimitiveArray::new(
158 Buffer::<u8>::zeroed(self.len()),
159 self.validity(),
160 ));
161 };
162
163 let Ok(min) = min_max
166 .min
167 .cast(&PType::I64.into())
168 .and_then(|s| i64::try_from(&s))
169 else {
170 return Ok(self.to_owned());
171 };
172 let Ok(max) = min_max
173 .max
174 .cast(&PType::I64.into())
175 .and_then(|s| i64::try_from(&s))
176 else {
177 return Ok(self.to_owned());
178 };
179
180 let nullability = self.as_ref().dtype().nullability();
181
182 if min < 0 || max < 0 {
183 if min >= i8::MIN as i64 && max <= i8::MAX as i64 {
185 let result = self
186 .as_ref()
187 .cast(DType::Primitive(PType::I8, nullability))?
188 .execute::<PrimitiveArray>(ctx)?;
189 return Ok(result);
190 }
191
192 if min >= i16::MIN as i64 && max <= i16::MAX as i64 {
193 let result = self
194 .as_ref()
195 .cast(DType::Primitive(PType::I16, nullability))?
196 .execute::<PrimitiveArray>(ctx)?;
197 return Ok(result);
198 }
199
200 if min >= i32::MIN as i64 && max <= i32::MAX as i64 {
201 let result = self
202 .as_ref()
203 .cast(DType::Primitive(PType::I32, nullability))?
204 .execute::<PrimitiveArray>(ctx)?;
205 return Ok(result);
206 }
207 } else {
208 if max <= u8::MAX as i64 {
210 let result = self
211 .as_ref()
212 .cast(DType::Primitive(PType::U8, nullability))?
213 .execute::<PrimitiveArray>(ctx)?;
214 return Ok(result);
215 }
216
217 if max <= u16::MAX as i64 {
218 let result = self
219 .as_ref()
220 .cast(DType::Primitive(PType::U16, nullability))?
221 .execute::<PrimitiveArray>(ctx)?;
222 return Ok(result);
223 }
224
225 if max <= u32::MAX as i64 {
226 let result = self
227 .as_ref()
228 .cast(DType::Primitive(PType::U32, nullability))?
229 .execute::<PrimitiveArray>(ctx)?;
230 return Ok(result);
231 }
232 }
233
234 Ok(self.to_owned())
235 }
236}
237impl<T: TypedArrayRef<Primitive>> PrimitiveArrayExt for T {}
238
239impl PrimitiveData {
241 pub(super) fn make_slots(validity: &Validity, len: usize) -> ArraySlots {
243 smallvec![validity_to_child(validity, len)]
244 }
245
246 pub unsafe fn new_unchecked_from_handle(
253 handle: BufferHandle,
254 ptype: PType,
255 _validity: Validity,
256 ) -> Self {
257 Self {
258 ptype,
259 buffer: handle,
260 }
261 }
262
263 pub fn new<T: NativePType>(buffer: impl Into<Buffer<T>>, validity: Validity) -> Self {
270 let buffer = buffer.into();
271 Self::try_new(buffer, validity).vortex_expect("PrimitiveArray construction failed")
272 }
273
274 #[inline]
283 pub fn try_new<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> VortexResult<Self> {
284 Self::validate(&buffer, &validity)?;
285
286 Ok(unsafe { Self::new_unchecked(buffer, validity) })
288 }
289
290 #[inline]
303 pub unsafe fn new_unchecked<T: NativePType>(buffer: Buffer<T>, _validity: Validity) -> Self {
304 #[cfg(debug_assertions)]
305 Self::validate(&buffer, &_validity)
306 .vortex_expect("[Debug Assertion]: Invalid `PrimitiveArray` parameters");
307
308 Self {
309 ptype: T::PTYPE,
310 buffer: BufferHandle::new_host(buffer.into_byte_buffer()),
311 }
312 }
313
314 #[inline]
318 pub fn validate<T: NativePType>(buffer: &Buffer<T>, validity: &Validity) -> VortexResult<()> {
319 if let Some(len) = validity.maybe_len()
320 && buffer.len() != len
321 {
322 return Err(vortex_err!(
323 InvalidArgument:
324 "Buffer and validity length mismatch: buffer={}, validity={}",
325 buffer.len(),
326 len
327 ));
328 }
329 Ok(())
330 }
331
332 pub fn empty<T: NativePType>(nullability: Nullability) -> Self {
333 Self::new(Buffer::<T>::empty(), nullability.into())
334 }
335}
336
337impl Array<Primitive> {
338 pub fn empty<T: NativePType>(nullability: Nullability) -> Self {
339 let dtype = DType::Primitive(T::PTYPE, nullability);
340 let len = 0;
341 let data = PrimitiveData::empty::<T>(nullability);
342 let slots = PrimitiveData::make_slots(&Validity::from(nullability), len);
343 unsafe {
344 Array::from_parts_unchecked(
345 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
346 )
347 }
348 }
349
350 pub fn new<T: NativePType>(buffer: impl Into<Buffer<T>>, validity: Validity) -> Self {
356 let buffer = buffer.into();
357 let dtype = DType::Primitive(T::PTYPE, validity.nullability());
358 let len = buffer.len();
359 let slots = PrimitiveData::make_slots(&validity, len);
360 let data = PrimitiveData::new(buffer, validity);
361 unsafe {
362 Array::from_parts_unchecked(
363 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
364 )
365 }
366 }
367
368 pub fn try_new<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> VortexResult<Self> {
370 let dtype = DType::Primitive(T::PTYPE, validity.nullability());
371 let len = buffer.len();
372 let slots = PrimitiveData::make_slots(&validity, len);
373 let data = PrimitiveData::try_new(buffer, validity)?;
374 Ok(unsafe {
375 Array::from_parts_unchecked(
376 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
377 )
378 })
379 }
380
381 pub unsafe fn new_unchecked<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> Self {
387 let dtype = DType::Primitive(T::PTYPE, validity.nullability());
388 let len = buffer.len();
389 let slots = PrimitiveData::make_slots(&validity, len);
390 let data = unsafe { PrimitiveData::new_unchecked(buffer, validity) };
391 unsafe {
392 Array::from_parts_unchecked(
393 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
394 )
395 }
396 }
397
398 pub unsafe fn new_unchecked_from_handle(
404 handle: BufferHandle,
405 ptype: PType,
406 validity: Validity,
407 ) -> Self {
408 let dtype = DType::Primitive(ptype, validity.nullability());
409 let len = handle.len() / ptype.byte_width();
410 let slots = PrimitiveData::make_slots(&validity, len);
411 let data = unsafe { PrimitiveData::new_unchecked_from_handle(handle, ptype, validity) };
412 unsafe {
413 Array::from_parts_unchecked(
414 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
415 )
416 }
417 }
418
419 pub fn from_buffer_handle(handle: BufferHandle, ptype: PType, validity: Validity) -> Self {
421 let dtype = DType::Primitive(ptype, validity.nullability());
422 let len = handle.len() / ptype.byte_width();
423 let slots = PrimitiveData::make_slots(&validity, len);
424 let data = PrimitiveData::from_buffer_handle(handle, ptype, validity);
425 Array::try_from_parts(ArrayParts::new(Primitive, dtype, len, data).with_slots(slots))
426 .vortex_expect("PrimitiveData is always valid")
427 }
428
429 pub fn from_byte_buffer(buffer: ByteBuffer, ptype: PType, validity: Validity) -> Self {
431 let dtype = DType::Primitive(ptype, validity.nullability());
432 let len = buffer.len() / ptype.byte_width();
433 let slots = PrimitiveData::make_slots(&validity, len);
434 let data = PrimitiveData::from_byte_buffer(buffer, ptype, validity);
435 unsafe {
436 Array::from_parts_unchecked(
437 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
438 )
439 }
440 }
441
442 pub fn from_values_byte_buffer(
444 valid_elems_buffer: ByteBuffer,
445 ptype: PType,
446 validity: Validity,
447 n_rows: usize,
448 ctx: &mut ExecutionCtx,
449 ) -> Self {
450 let dtype = DType::Primitive(ptype, validity.nullability());
451 let len = n_rows;
452 let slots = PrimitiveData::make_slots(&validity, len);
453 let data = PrimitiveData::from_values_byte_buffer(
454 valid_elems_buffer,
455 ptype,
456 validity,
457 n_rows,
458 ctx,
459 );
460 unsafe {
461 Array::from_parts_unchecked(
462 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
463 )
464 }
465 }
466
467 pub fn validate<T: NativePType>(buffer: &Buffer<T>, validity: &Validity) -> VortexResult<()> {
469 PrimitiveData::validate(buffer, validity)
470 }
471
472 pub fn into_data_parts(self) -> PrimitiveDataParts {
473 let validity = PrimitiveArrayExt::validity(&self);
474 let ptype = PrimitiveArrayExt::ptype(&self);
475 let data = self.into_data();
476 PrimitiveDataParts {
477 ptype,
478 buffer: data.buffer,
479 validity,
480 }
481 }
482
483 pub fn map_each_with_validity<T, R, F>(self, ctx: &mut ExecutionCtx, f: F) -> VortexResult<Self>
484 where
485 T: NativePType,
486 R: NativePType,
487 F: FnMut((T, bool)) -> R,
488 {
489 let validity = PrimitiveArrayExt::validity(&self);
490 let data = self.into_data();
491 let buf_iter = data.to_buffer::<T>().into_iter();
492
493 let buffer = match &validity {
494 Validity::NonNullable | Validity::AllValid => {
495 Buffer::<R>::from_trusted_len_iter(buf_iter.zip(repeat(true)).map(f))
496 }
497 Validity::AllInvalid => {
498 Buffer::<R>::from_trusted_len_iter(buf_iter.zip(repeat(false)).map(f))
499 }
500 Validity::Array(val) => {
501 let val = val.clone().execute::<BoolArray>(ctx)?.into_bit_buffer();
502 Buffer::<R>::from_trusted_len_iter(buf_iter.zip(val.iter()).map(f))
503 }
504 };
505 Ok(PrimitiveArray::new(buffer, validity))
506 }
507}
508
509impl PrimitiveData {
510 pub fn len(&self) -> usize {
511 self.buffer.len() / self.ptype.byte_width()
512 }
513
514 pub fn is_empty(&self) -> bool {
516 self.buffer.is_empty()
517 }
518
519 pub fn ptype(&self) -> PType {
520 self.ptype
521 }
522
523 pub fn buffer_handle(&self) -> &BufferHandle {
525 &self.buffer
526 }
527
528 pub fn from_buffer_handle(handle: BufferHandle, ptype: PType, _validity: Validity) -> Self {
529 Self {
530 ptype,
531 buffer: handle,
532 }
533 }
534
535 pub fn from_byte_buffer(buffer: ByteBuffer, ptype: PType, validity: Validity) -> Self {
536 match_each_native_ptype!(ptype, |T| {
537 Self::new::<T>(Buffer::from_byte_buffer(buffer), validity)
538 })
539 }
540
541 pub fn from_values_byte_buffer(
543 valid_elems_buffer: ByteBuffer,
544 ptype: PType,
545 validity: Validity,
546 n_rows: usize,
547 ctx: &mut ExecutionCtx,
548 ) -> Self {
549 let byte_width = ptype.byte_width();
550 let alignment = Alignment::new(byte_width);
551 let buffer = match &validity {
552 Validity::AllValid | Validity::NonNullable => valid_elems_buffer.aligned(alignment),
553 Validity::AllInvalid => ByteBuffer::zeroed_aligned(n_rows * byte_width, alignment),
554 Validity::Array(is_valid) => {
555 let bool_array = is_valid
556 .clone()
557 .execute::<BoolArray>(ctx)
558 .vortex_expect("must be a bool array");
559 let bool_buffer = bool_array.bit_buffer_view();
560 let mut bytes = ByteBufferMut::zeroed_aligned(n_rows * byte_width, alignment);
561 for (i, valid_i) in bool_buffer.set_indices().enumerate() {
562 bytes[valid_i * byte_width..(valid_i + 1) * byte_width]
563 .copy_from_slice(&valid_elems_buffer[i * byte_width..(i + 1) * byte_width])
564 }
565 bytes.freeze()
566 }
567 };
568
569 Self::from_byte_buffer(buffer, ptype, validity)
570 }
571
572 pub fn to_buffer<T: NativePType>(&self) -> Buffer<T> {
576 if T::PTYPE != self.ptype() {
577 vortex_panic!(
578 "Attempted to get buffer of type {} from array of type {}",
579 T::PTYPE,
580 self.ptype()
581 )
582 }
583 Buffer::from_byte_buffer(self.buffer_handle().to_host_sync())
584 }
585
586 pub fn into_buffer<T: NativePType>(self) -> Buffer<T> {
588 if T::PTYPE != self.ptype() {
589 vortex_panic!(
590 "Attempted to get buffer of type {} from array of type {}",
591 T::PTYPE,
592 self.ptype()
593 )
594 }
595 Buffer::from_byte_buffer(self.buffer.into_host_sync())
596 }
597
598 pub fn into_buffer_mut<T: NativePType>(self) -> BufferMut<T> {
601 self.try_into_buffer_mut()
602 .unwrap_or_else(|buffer| BufferMut::<T>::copy_from(&buffer))
603 }
604
605 pub fn try_into_buffer_mut<T: NativePType>(self) -> Result<BufferMut<T>, Buffer<T>> {
610 if T::PTYPE != self.ptype() {
611 vortex_panic!(
612 "Attempted to get buffer_mut of type {} from array of type {}",
613 T::PTYPE,
614 self.ptype()
615 )
616 }
617 let buffer = Buffer::<T>::from_byte_buffer(self.buffer.into_host_sync());
618 buffer.try_into_mut()
619 }
620}