1use std::fmt::Display;
5use std::fmt::Formatter;
6use std::iter;
7
8use smallvec::smallvec;
9use vortex_buffer::Alignment;
10use vortex_buffer::Buffer;
11use vortex_buffer::BufferMut;
12use vortex_buffer::ByteBuffer;
13use vortex_buffer::ByteBufferMut;
14use vortex_error::VortexExpect;
15use vortex_error::VortexResult;
16use vortex_error::vortex_err;
17use vortex_error::vortex_panic;
18
19use crate::ArraySlots;
20use crate::LEGACY_SESSION;
21#[expect(deprecated)]
22use crate::ToCanonical as _;
23use crate::VortexSessionExecute;
24use crate::array::Array;
25use crate::array::ArrayParts;
26use crate::array::TypedArrayRef;
27use crate::arrays::Primitive;
28use crate::arrays::PrimitiveArray;
29use crate::dtype::DType;
30use crate::dtype::NativePType;
31use crate::dtype::Nullability;
32use crate::dtype::PType;
33use crate::match_each_native_ptype;
34use crate::validity::Validity;
35
36mod accessor;
37mod cast;
38mod conversion;
39mod patch;
40mod top_value;
41
42pub use patch::chunk_range;
43pub use patch::patch_chunk;
44
45use crate::ArrayRef;
46use crate::aggregate_fn::fns::min_max::min_max;
47use crate::array::child_to_validity;
48use crate::array::validity_to_child;
49use crate::arrays::bool::BoolArrayExt;
50use crate::buffer::BufferHandle;
51use crate::builtins::ArrayBuiltins;
52
53pub(super) const VALIDITY_SLOT: usize = 0;
55pub(super) const NUM_SLOTS: usize = 1;
56pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["validity"];
57
58#[derive(Clone, Debug)]
91pub struct PrimitiveData {
92 pub(super) ptype: PType,
93 pub(super) buffer: BufferHandle,
94}
95
96impl Display for PrimitiveData {
97 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
98 write!(f, "ptype: {}", self.ptype)
99 }
100}
101
102pub struct PrimitiveDataParts {
103 pub ptype: PType,
104 pub buffer: BufferHandle,
105 pub validity: Validity,
106}
107
108pub trait PrimitiveArrayExt: TypedArrayRef<Primitive> {
109 fn ptype(&self) -> PType {
110 match self.as_ref().dtype() {
111 DType::Primitive(ptype, _) => *ptype,
112 _ => unreachable!("PrimitiveArrayExt requires a primitive dtype"),
113 }
114 }
115
116 fn nullability(&self) -> Nullability {
117 match self.as_ref().dtype() {
118 DType::Primitive(_, nullability) => *nullability,
119 _ => unreachable!("PrimitiveArrayExt requires a primitive dtype"),
120 }
121 }
122
123 fn validity_child(&self) -> Option<&ArrayRef> {
124 self.as_ref().slots()[VALIDITY_SLOT].as_ref()
125 }
126
127 fn validity(&self) -> Validity {
128 child_to_validity(
129 self.as_ref().slots()[VALIDITY_SLOT].as_ref(),
130 self.nullability(),
131 )
132 }
133
134 fn buffer_handle(&self) -> &BufferHandle {
135 &self.buffer
136 }
137
138 fn reinterpret_cast(&self, ptype: PType) -> PrimitiveArray {
139 if self.ptype() == ptype {
140 return self.to_owned();
141 }
142
143 assert_eq!(
144 self.ptype().byte_width(),
145 ptype.byte_width(),
146 "can't reinterpret cast between integers of two different widths"
147 );
148
149 PrimitiveArray::from_buffer_handle(self.buffer_handle().clone(), ptype, self.validity())
150 }
151
152 fn narrow(&self) -> VortexResult<PrimitiveArray> {
154 if !self.ptype().is_int() {
155 return Ok(self.to_owned());
156 }
157
158 let mut ctx = LEGACY_SESSION.create_execution_ctx();
159 let Some(min_max) = min_max(self.as_ref(), &mut ctx)? else {
160 return Ok(PrimitiveArray::new(
161 Buffer::<u8>::zeroed(self.len()),
162 self.validity(),
163 ));
164 };
165
166 let Ok(min) = min_max
169 .min
170 .cast(&PType::I64.into())
171 .and_then(|s| i64::try_from(&s))
172 else {
173 return Ok(self.to_owned());
174 };
175 let Ok(max) = min_max
176 .max
177 .cast(&PType::I64.into())
178 .and_then(|s| i64::try_from(&s))
179 else {
180 return Ok(self.to_owned());
181 };
182
183 let nullability = self.as_ref().dtype().nullability();
184
185 if min < 0 || max < 0 {
186 if min >= i8::MIN as i64 && max <= i8::MAX as i64 {
188 #[expect(deprecated)]
189 let result = self
190 .as_ref()
191 .cast(DType::Primitive(PType::I8, nullability))?
192 .to_primitive();
193 return Ok(result);
194 }
195
196 if min >= i16::MIN as i64 && max <= i16::MAX as i64 {
197 #[expect(deprecated)]
198 let result = self
199 .as_ref()
200 .cast(DType::Primitive(PType::I16, nullability))?
201 .to_primitive();
202 return Ok(result);
203 }
204
205 if min >= i32::MIN as i64 && max <= i32::MAX as i64 {
206 #[expect(deprecated)]
207 let result = self
208 .as_ref()
209 .cast(DType::Primitive(PType::I32, nullability))?
210 .to_primitive();
211 return Ok(result);
212 }
213 } else {
214 if max <= u8::MAX as i64 {
216 #[expect(deprecated)]
217 let result = self
218 .as_ref()
219 .cast(DType::Primitive(PType::U8, nullability))?
220 .to_primitive();
221 return Ok(result);
222 }
223
224 if max <= u16::MAX as i64 {
225 #[expect(deprecated)]
226 let result = self
227 .as_ref()
228 .cast(DType::Primitive(PType::U16, nullability))?
229 .to_primitive();
230 return Ok(result);
231 }
232
233 if max <= u32::MAX as i64 {
234 #[expect(deprecated)]
235 let result = self
236 .as_ref()
237 .cast(DType::Primitive(PType::U32, nullability))?
238 .to_primitive();
239 return Ok(result);
240 }
241 }
242
243 Ok(self.to_owned())
244 }
245}
246impl<T: TypedArrayRef<Primitive>> PrimitiveArrayExt for T {}
247
248impl PrimitiveData {
250 pub(super) fn make_slots(validity: &Validity, len: usize) -> ArraySlots {
252 smallvec![validity_to_child(validity, len)]
253 }
254
255 pub unsafe fn new_unchecked_from_handle(
262 handle: BufferHandle,
263 ptype: PType,
264 _validity: Validity,
265 ) -> Self {
266 Self {
267 ptype,
268 buffer: handle,
269 }
270 }
271
272 pub fn new<T: NativePType>(buffer: impl Into<Buffer<T>>, validity: Validity) -> Self {
279 let buffer = buffer.into();
280 Self::try_new(buffer, validity).vortex_expect("PrimitiveArray construction failed")
281 }
282
283 #[inline]
292 pub fn try_new<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> VortexResult<Self> {
293 Self::validate(&buffer, &validity)?;
294
295 Ok(unsafe { Self::new_unchecked(buffer, validity) })
297 }
298
299 #[inline]
312 pub unsafe fn new_unchecked<T: NativePType>(buffer: Buffer<T>, _validity: Validity) -> Self {
313 #[cfg(debug_assertions)]
314 Self::validate(&buffer, &_validity)
315 .vortex_expect("[Debug Assertion]: Invalid `PrimitiveArray` parameters");
316
317 Self {
318 ptype: T::PTYPE,
319 buffer: BufferHandle::new_host(buffer.into_byte_buffer()),
320 }
321 }
322
323 #[inline]
327 pub fn validate<T: NativePType>(buffer: &Buffer<T>, validity: &Validity) -> VortexResult<()> {
328 if let Some(len) = validity.maybe_len()
329 && buffer.len() != len
330 {
331 return Err(vortex_err!(
332 InvalidArgument:
333 "Buffer and validity length mismatch: buffer={}, validity={}",
334 buffer.len(),
335 len
336 ));
337 }
338 Ok(())
339 }
340
341 pub fn empty<T: NativePType>(nullability: Nullability) -> Self {
342 Self::new(Buffer::<T>::empty(), nullability.into())
343 }
344}
345
346impl Array<Primitive> {
347 pub fn empty<T: NativePType>(nullability: Nullability) -> Self {
348 let dtype = DType::Primitive(T::PTYPE, nullability);
349 let len = 0;
350 let data = PrimitiveData::empty::<T>(nullability);
351 let slots = PrimitiveData::make_slots(&Validity::from(nullability), len);
352 unsafe {
353 Array::from_parts_unchecked(
354 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
355 )
356 }
357 }
358
359 pub fn new<T: NativePType>(buffer: impl Into<Buffer<T>>, validity: Validity) -> Self {
365 let buffer = buffer.into();
366 let dtype = DType::Primitive(T::PTYPE, validity.nullability());
367 let len = buffer.len();
368 let slots = PrimitiveData::make_slots(&validity, len);
369 let data = PrimitiveData::new(buffer, validity);
370 unsafe {
371 Array::from_parts_unchecked(
372 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
373 )
374 }
375 }
376
377 pub fn try_new<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> VortexResult<Self> {
379 let dtype = DType::Primitive(T::PTYPE, validity.nullability());
380 let len = buffer.len();
381 let slots = PrimitiveData::make_slots(&validity, len);
382 let data = PrimitiveData::try_new(buffer, validity)?;
383 Ok(unsafe {
384 Array::from_parts_unchecked(
385 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
386 )
387 })
388 }
389
390 pub unsafe fn new_unchecked<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> Self {
396 let dtype = DType::Primitive(T::PTYPE, validity.nullability());
397 let len = buffer.len();
398 let slots = PrimitiveData::make_slots(&validity, len);
399 let data = unsafe { PrimitiveData::new_unchecked(buffer, validity) };
400 unsafe {
401 Array::from_parts_unchecked(
402 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
403 )
404 }
405 }
406
407 pub unsafe fn new_unchecked_from_handle(
413 handle: BufferHandle,
414 ptype: PType,
415 validity: Validity,
416 ) -> Self {
417 let dtype = DType::Primitive(ptype, validity.nullability());
418 let len = handle.len() / ptype.byte_width();
419 let slots = PrimitiveData::make_slots(&validity, len);
420 let data = unsafe { PrimitiveData::new_unchecked_from_handle(handle, ptype, validity) };
421 unsafe {
422 Array::from_parts_unchecked(
423 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
424 )
425 }
426 }
427
428 pub fn from_buffer_handle(handle: BufferHandle, ptype: PType, validity: Validity) -> Self {
430 let dtype = DType::Primitive(ptype, validity.nullability());
431 let len = handle.len() / ptype.byte_width();
432 let slots = PrimitiveData::make_slots(&validity, len);
433 let data = PrimitiveData::from_buffer_handle(handle, ptype, validity);
434 Array::try_from_parts(ArrayParts::new(Primitive, dtype, len, data).with_slots(slots))
435 .vortex_expect("PrimitiveData is always valid")
436 }
437
438 pub fn from_byte_buffer(buffer: ByteBuffer, ptype: PType, validity: Validity) -> Self {
440 let dtype = DType::Primitive(ptype, validity.nullability());
441 let len = buffer.len() / ptype.byte_width();
442 let slots = PrimitiveData::make_slots(&validity, len);
443 let data = PrimitiveData::from_byte_buffer(buffer, ptype, validity);
444 unsafe {
445 Array::from_parts_unchecked(
446 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
447 )
448 }
449 }
450
451 pub fn from_values_byte_buffer(
453 valid_elems_buffer: ByteBuffer,
454 ptype: PType,
455 validity: Validity,
456 n_rows: usize,
457 ) -> Self {
458 let dtype = DType::Primitive(ptype, validity.nullability());
459 let len = n_rows;
460 let slots = PrimitiveData::make_slots(&validity, len);
461 let data =
462 PrimitiveData::from_values_byte_buffer(valid_elems_buffer, ptype, validity, n_rows);
463 unsafe {
464 Array::from_parts_unchecked(
465 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
466 )
467 }
468 }
469
470 pub fn validate<T: NativePType>(buffer: &Buffer<T>, validity: &Validity) -> VortexResult<()> {
472 PrimitiveData::validate(buffer, validity)
473 }
474
475 pub fn into_data_parts(self) -> PrimitiveDataParts {
476 let validity = PrimitiveArrayExt::validity(&self);
477 let ptype = PrimitiveArrayExt::ptype(&self);
478 let data = self.into_data();
479 PrimitiveDataParts {
480 ptype,
481 buffer: data.buffer,
482 validity,
483 }
484 }
485
486 pub fn map_each_with_validity<T, R, F>(self, f: F) -> VortexResult<Self>
487 where
488 T: NativePType,
489 R: NativePType,
490 F: FnMut((T, bool)) -> R,
491 {
492 let validity = PrimitiveArrayExt::validity(&self);
493 let data = self.into_data();
494 let buf_iter = data.to_buffer::<T>().into_iter();
495
496 let buffer = match &validity {
497 Validity::NonNullable | Validity::AllValid => {
498 BufferMut::<R>::from_iter(buf_iter.zip(iter::repeat(true)).map(f))
499 }
500 Validity::AllInvalid => {
501 BufferMut::<R>::from_iter(buf_iter.zip(iter::repeat(false)).map(f))
502 }
503 Validity::Array(val) => {
504 #[expect(deprecated)]
505 let val = val.to_bool().into_bit_buffer();
506 BufferMut::<R>::from_iter(buf_iter.zip(val.iter()).map(f))
507 }
508 };
509 Ok(PrimitiveArray::new(buffer.freeze(), validity))
510 }
511}
512
513impl PrimitiveData {
514 pub fn len(&self) -> usize {
515 self.buffer.len() / self.ptype.byte_width()
516 }
517
518 pub fn is_empty(&self) -> bool {
520 self.buffer.is_empty()
521 }
522
523 pub fn ptype(&self) -> PType {
524 self.ptype
525 }
526
527 pub fn buffer_handle(&self) -> &BufferHandle {
529 &self.buffer
530 }
531
532 pub fn from_buffer_handle(handle: BufferHandle, ptype: PType, _validity: Validity) -> Self {
533 Self {
534 ptype,
535 buffer: handle,
536 }
537 }
538
539 pub fn from_byte_buffer(buffer: ByteBuffer, ptype: PType, validity: Validity) -> Self {
540 match_each_native_ptype!(ptype, |T| {
541 Self::new::<T>(Buffer::from_byte_buffer(buffer), validity)
542 })
543 }
544
545 pub fn from_values_byte_buffer(
547 valid_elems_buffer: ByteBuffer,
548 ptype: PType,
549 validity: Validity,
550 n_rows: usize,
551 ) -> Self {
552 let byte_width = ptype.byte_width();
553 let alignment = Alignment::new(byte_width);
554 let buffer = match &validity {
555 Validity::AllValid | Validity::NonNullable => valid_elems_buffer.aligned(alignment),
556 Validity::AllInvalid => ByteBuffer::zeroed_aligned(n_rows * byte_width, alignment),
557 Validity::Array(is_valid) => {
558 #[expect(deprecated)]
559 let bool_array = is_valid.to_bool();
560 let bool_buffer = bool_array.to_bit_buffer();
561 let mut bytes = ByteBufferMut::zeroed_aligned(n_rows * byte_width, alignment);
562 for (i, valid_i) in bool_buffer.set_indices().enumerate() {
563 bytes[valid_i * byte_width..(valid_i + 1) * byte_width]
564 .copy_from_slice(&valid_elems_buffer[i * byte_width..(i + 1) * byte_width])
565 }
566 bytes.freeze()
567 }
568 };
569
570 Self::from_byte_buffer(buffer, ptype, validity)
571 }
572
573 pub fn to_buffer<T: NativePType>(&self) -> Buffer<T> {
577 if T::PTYPE != self.ptype() {
578 vortex_panic!(
579 "Attempted to get buffer of type {} from array of type {}",
580 T::PTYPE,
581 self.ptype()
582 )
583 }
584 Buffer::from_byte_buffer(self.buffer_handle().to_host_sync())
585 }
586
587 pub fn into_buffer<T: NativePType>(self) -> Buffer<T> {
589 if T::PTYPE != self.ptype() {
590 vortex_panic!(
591 "Attempted to get buffer of type {} from array of type {}",
592 T::PTYPE,
593 self.ptype()
594 )
595 }
596 Buffer::from_byte_buffer(self.buffer.into_host_sync())
597 }
598
599 pub fn into_buffer_mut<T: NativePType>(self) -> BufferMut<T> {
602 self.try_into_buffer_mut()
603 .unwrap_or_else(|buffer| BufferMut::<T>::copy_from(&buffer))
604 }
605
606 pub fn try_into_buffer_mut<T: NativePType>(self) -> Result<BufferMut<T>, Buffer<T>> {
608 if T::PTYPE != self.ptype() {
609 vortex_panic!(
610 "Attempted to get buffer_mut of type {} from array of type {}",
611 T::PTYPE,
612 self.ptype()
613 )
614 }
615 let buffer = Buffer::<T>::from_byte_buffer(self.buffer.into_host_sync());
616 buffer.try_into_mut()
617 }
618}