1use std::fmt::Display;
5use std::fmt::Formatter;
6use std::iter;
7
8use smallvec::smallvec;
9use vortex_buffer::Alignment;
10use vortex_buffer::Buffer;
11use vortex_buffer::BufferMut;
12use vortex_buffer::ByteBuffer;
13use vortex_buffer::ByteBufferMut;
14use vortex_error::VortexExpect;
15use vortex_error::VortexResult;
16use vortex_error::vortex_err;
17use vortex_error::vortex_panic;
18
19use crate::ArraySlots;
20use crate::ExecutionCtx;
21#[expect(deprecated)]
22use crate::ToCanonical as _;
23use crate::array::Array;
24use crate::array::ArrayParts;
25use crate::array::TypedArrayRef;
26use crate::arrays::Primitive;
27use crate::arrays::PrimitiveArray;
28use crate::dtype::DType;
29use crate::dtype::NativePType;
30use crate::dtype::Nullability;
31use crate::dtype::PType;
32use crate::match_each_native_ptype;
33use crate::validity::Validity;
34
35mod accessor;
36mod cast;
37mod conversion;
38mod patch;
39mod top_value;
40
41pub use patch::chunk_range;
42pub use patch::patch_chunk;
43
44use crate::ArrayRef;
45use crate::aggregate_fn::fns::min_max::min_max;
46use crate::array::child_to_validity;
47use crate::array::validity_to_child;
48use crate::arrays::bool::BoolArrayExt;
49use crate::buffer::BufferHandle;
50use crate::builtins::ArrayBuiltins;
51
52pub(super) const VALIDITY_SLOT: usize = 0;
54pub(super) const NUM_SLOTS: usize = 1;
55pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["validity"];
56
57#[derive(Clone, Debug)]
90pub struct PrimitiveData {
91 pub(super) ptype: PType,
92 pub(super) buffer: BufferHandle,
93}
94
95impl Display for PrimitiveData {
96 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
97 write!(f, "ptype: {}", self.ptype)
98 }
99}
100
101pub struct PrimitiveDataParts {
102 pub ptype: PType,
103 pub buffer: BufferHandle,
104 pub validity: Validity,
105}
106
107pub trait PrimitiveArrayExt: TypedArrayRef<Primitive> {
108 fn ptype(&self) -> PType {
109 match self.as_ref().dtype() {
110 DType::Primitive(ptype, _) => *ptype,
111 _ => unreachable!("PrimitiveArrayExt requires a primitive dtype"),
112 }
113 }
114
115 fn nullability(&self) -> Nullability {
116 match self.as_ref().dtype() {
117 DType::Primitive(_, nullability) => *nullability,
118 _ => unreachable!("PrimitiveArrayExt requires a primitive dtype"),
119 }
120 }
121
122 fn validity_child(&self) -> Option<&ArrayRef> {
123 self.as_ref().slots()[VALIDITY_SLOT].as_ref()
124 }
125
126 fn validity(&self) -> Validity {
127 child_to_validity(
128 self.as_ref().slots()[VALIDITY_SLOT].as_ref(),
129 self.nullability(),
130 )
131 }
132
133 fn buffer_handle(&self) -> &BufferHandle {
134 &self.buffer
135 }
136
137 fn reinterpret_cast(&self, ptype: PType) -> PrimitiveArray {
138 if self.ptype() == ptype {
139 return self.to_owned();
140 }
141
142 assert_eq!(
143 self.ptype().byte_width(),
144 ptype.byte_width(),
145 "can't reinterpret cast between integers of two different widths"
146 );
147
148 PrimitiveArray::from_buffer_handle(self.buffer_handle().clone(), ptype, self.validity())
149 }
150
151 fn narrow(&self, ctx: &mut ExecutionCtx) -> VortexResult<PrimitiveArray> {
153 if !self.ptype().is_int() {
154 return Ok(self.to_owned());
155 }
156
157 let Some(min_max) = min_max(self.as_ref(), ctx)? else {
158 return Ok(PrimitiveArray::new(
159 Buffer::<u8>::zeroed(self.len()),
160 self.validity(),
161 ));
162 };
163
164 let Ok(min) = min_max
167 .min
168 .cast(&PType::I64.into())
169 .and_then(|s| i64::try_from(&s))
170 else {
171 return Ok(self.to_owned());
172 };
173 let Ok(max) = min_max
174 .max
175 .cast(&PType::I64.into())
176 .and_then(|s| i64::try_from(&s))
177 else {
178 return Ok(self.to_owned());
179 };
180
181 let nullability = self.as_ref().dtype().nullability();
182
183 if min < 0 || max < 0 {
184 if min >= i8::MIN as i64 && max <= i8::MAX as i64 {
186 let result = self
187 .as_ref()
188 .cast(DType::Primitive(PType::I8, nullability))?
189 .execute::<PrimitiveArray>(ctx)?;
190 return Ok(result);
191 }
192
193 if min >= i16::MIN as i64 && max <= i16::MAX as i64 {
194 let result = self
195 .as_ref()
196 .cast(DType::Primitive(PType::I16, nullability))?
197 .execute::<PrimitiveArray>(ctx)?;
198 return Ok(result);
199 }
200
201 if min >= i32::MIN as i64 && max <= i32::MAX as i64 {
202 let result = self
203 .as_ref()
204 .cast(DType::Primitive(PType::I32, nullability))?
205 .execute::<PrimitiveArray>(ctx)?;
206 return Ok(result);
207 }
208 } else {
209 if max <= u8::MAX as i64 {
211 let result = self
212 .as_ref()
213 .cast(DType::Primitive(PType::U8, nullability))?
214 .execute::<PrimitiveArray>(ctx)?;
215 return Ok(result);
216 }
217
218 if max <= u16::MAX as i64 {
219 let result = self
220 .as_ref()
221 .cast(DType::Primitive(PType::U16, nullability))?
222 .execute::<PrimitiveArray>(ctx)?;
223 return Ok(result);
224 }
225
226 if max <= u32::MAX as i64 {
227 let result = self
228 .as_ref()
229 .cast(DType::Primitive(PType::U32, nullability))?
230 .execute::<PrimitiveArray>(ctx)?;
231 return Ok(result);
232 }
233 }
234
235 Ok(self.to_owned())
236 }
237}
238impl<T: TypedArrayRef<Primitive>> PrimitiveArrayExt for T {}
239
240impl PrimitiveData {
242 pub(super) fn make_slots(validity: &Validity, len: usize) -> ArraySlots {
244 smallvec![validity_to_child(validity, len)]
245 }
246
247 pub unsafe fn new_unchecked_from_handle(
254 handle: BufferHandle,
255 ptype: PType,
256 _validity: Validity,
257 ) -> Self {
258 Self {
259 ptype,
260 buffer: handle,
261 }
262 }
263
264 pub fn new<T: NativePType>(buffer: impl Into<Buffer<T>>, validity: Validity) -> Self {
271 let buffer = buffer.into();
272 Self::try_new(buffer, validity).vortex_expect("PrimitiveArray construction failed")
273 }
274
275 #[inline]
284 pub fn try_new<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> VortexResult<Self> {
285 Self::validate(&buffer, &validity)?;
286
287 Ok(unsafe { Self::new_unchecked(buffer, validity) })
289 }
290
291 #[inline]
304 pub unsafe fn new_unchecked<T: NativePType>(buffer: Buffer<T>, _validity: Validity) -> Self {
305 #[cfg(debug_assertions)]
306 Self::validate(&buffer, &_validity)
307 .vortex_expect("[Debug Assertion]: Invalid `PrimitiveArray` parameters");
308
309 Self {
310 ptype: T::PTYPE,
311 buffer: BufferHandle::new_host(buffer.into_byte_buffer()),
312 }
313 }
314
315 #[inline]
319 pub fn validate<T: NativePType>(buffer: &Buffer<T>, validity: &Validity) -> VortexResult<()> {
320 if let Some(len) = validity.maybe_len()
321 && buffer.len() != len
322 {
323 return Err(vortex_err!(
324 InvalidArgument:
325 "Buffer and validity length mismatch: buffer={}, validity={}",
326 buffer.len(),
327 len
328 ));
329 }
330 Ok(())
331 }
332
333 pub fn empty<T: NativePType>(nullability: Nullability) -> Self {
334 Self::new(Buffer::<T>::empty(), nullability.into())
335 }
336}
337
338impl Array<Primitive> {
339 pub fn empty<T: NativePType>(nullability: Nullability) -> Self {
340 let dtype = DType::Primitive(T::PTYPE, nullability);
341 let len = 0;
342 let data = PrimitiveData::empty::<T>(nullability);
343 let slots = PrimitiveData::make_slots(&Validity::from(nullability), len);
344 unsafe {
345 Array::from_parts_unchecked(
346 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
347 )
348 }
349 }
350
351 pub fn new<T: NativePType>(buffer: impl Into<Buffer<T>>, validity: Validity) -> Self {
357 let buffer = buffer.into();
358 let dtype = DType::Primitive(T::PTYPE, validity.nullability());
359 let len = buffer.len();
360 let slots = PrimitiveData::make_slots(&validity, len);
361 let data = PrimitiveData::new(buffer, validity);
362 unsafe {
363 Array::from_parts_unchecked(
364 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
365 )
366 }
367 }
368
369 pub fn try_new<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> VortexResult<Self> {
371 let dtype = DType::Primitive(T::PTYPE, validity.nullability());
372 let len = buffer.len();
373 let slots = PrimitiveData::make_slots(&validity, len);
374 let data = PrimitiveData::try_new(buffer, validity)?;
375 Ok(unsafe {
376 Array::from_parts_unchecked(
377 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
378 )
379 })
380 }
381
382 pub unsafe fn new_unchecked<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> Self {
388 let dtype = DType::Primitive(T::PTYPE, validity.nullability());
389 let len = buffer.len();
390 let slots = PrimitiveData::make_slots(&validity, len);
391 let data = unsafe { PrimitiveData::new_unchecked(buffer, validity) };
392 unsafe {
393 Array::from_parts_unchecked(
394 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
395 )
396 }
397 }
398
399 pub unsafe fn new_unchecked_from_handle(
405 handle: BufferHandle,
406 ptype: PType,
407 validity: Validity,
408 ) -> Self {
409 let dtype = DType::Primitive(ptype, validity.nullability());
410 let len = handle.len() / ptype.byte_width();
411 let slots = PrimitiveData::make_slots(&validity, len);
412 let data = unsafe { PrimitiveData::new_unchecked_from_handle(handle, ptype, validity) };
413 unsafe {
414 Array::from_parts_unchecked(
415 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
416 )
417 }
418 }
419
420 pub fn from_buffer_handle(handle: BufferHandle, ptype: PType, validity: Validity) -> Self {
422 let dtype = DType::Primitive(ptype, validity.nullability());
423 let len = handle.len() / ptype.byte_width();
424 let slots = PrimitiveData::make_slots(&validity, len);
425 let data = PrimitiveData::from_buffer_handle(handle, ptype, validity);
426 Array::try_from_parts(ArrayParts::new(Primitive, dtype, len, data).with_slots(slots))
427 .vortex_expect("PrimitiveData is always valid")
428 }
429
430 pub fn from_byte_buffer(buffer: ByteBuffer, ptype: PType, validity: Validity) -> Self {
432 let dtype = DType::Primitive(ptype, validity.nullability());
433 let len = buffer.len() / ptype.byte_width();
434 let slots = PrimitiveData::make_slots(&validity, len);
435 let data = PrimitiveData::from_byte_buffer(buffer, ptype, validity);
436 unsafe {
437 Array::from_parts_unchecked(
438 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
439 )
440 }
441 }
442
443 pub fn from_values_byte_buffer(
445 valid_elems_buffer: ByteBuffer,
446 ptype: PType,
447 validity: Validity,
448 n_rows: usize,
449 ) -> Self {
450 let dtype = DType::Primitive(ptype, validity.nullability());
451 let len = n_rows;
452 let slots = PrimitiveData::make_slots(&validity, len);
453 let data =
454 PrimitiveData::from_values_byte_buffer(valid_elems_buffer, ptype, validity, n_rows);
455 unsafe {
456 Array::from_parts_unchecked(
457 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
458 )
459 }
460 }
461
462 pub fn validate<T: NativePType>(buffer: &Buffer<T>, validity: &Validity) -> VortexResult<()> {
464 PrimitiveData::validate(buffer, validity)
465 }
466
467 pub fn into_data_parts(self) -> PrimitiveDataParts {
468 let validity = PrimitiveArrayExt::validity(&self);
469 let ptype = PrimitiveArrayExt::ptype(&self);
470 let data = self.into_data();
471 PrimitiveDataParts {
472 ptype,
473 buffer: data.buffer,
474 validity,
475 }
476 }
477
478 pub fn map_each_with_validity<T, R, F>(self, f: F) -> VortexResult<Self>
479 where
480 T: NativePType,
481 R: NativePType,
482 F: FnMut((T, bool)) -> R,
483 {
484 let validity = PrimitiveArrayExt::validity(&self);
485 let data = self.into_data();
486 let buf_iter = data.to_buffer::<T>().into_iter();
487
488 let buffer = match &validity {
489 Validity::NonNullable | Validity::AllValid => {
490 BufferMut::<R>::from_iter(buf_iter.zip(iter::repeat(true)).map(f))
491 }
492 Validity::AllInvalid => {
493 BufferMut::<R>::from_iter(buf_iter.zip(iter::repeat(false)).map(f))
494 }
495 Validity::Array(val) => {
496 #[expect(deprecated)]
497 let val = val.to_bool().into_bit_buffer();
498 BufferMut::<R>::from_iter(buf_iter.zip(val.iter()).map(f))
499 }
500 };
501 Ok(PrimitiveArray::new(buffer.freeze(), validity))
502 }
503}
504
505impl PrimitiveData {
506 pub fn len(&self) -> usize {
507 self.buffer.len() / self.ptype.byte_width()
508 }
509
510 pub fn is_empty(&self) -> bool {
512 self.buffer.is_empty()
513 }
514
515 pub fn ptype(&self) -> PType {
516 self.ptype
517 }
518
519 pub fn buffer_handle(&self) -> &BufferHandle {
521 &self.buffer
522 }
523
524 pub fn from_buffer_handle(handle: BufferHandle, ptype: PType, _validity: Validity) -> Self {
525 Self {
526 ptype,
527 buffer: handle,
528 }
529 }
530
531 pub fn from_byte_buffer(buffer: ByteBuffer, ptype: PType, validity: Validity) -> Self {
532 match_each_native_ptype!(ptype, |T| {
533 Self::new::<T>(Buffer::from_byte_buffer(buffer), validity)
534 })
535 }
536
537 pub fn from_values_byte_buffer(
539 valid_elems_buffer: ByteBuffer,
540 ptype: PType,
541 validity: Validity,
542 n_rows: usize,
543 ) -> Self {
544 let byte_width = ptype.byte_width();
545 let alignment = Alignment::new(byte_width);
546 let buffer = match &validity {
547 Validity::AllValid | Validity::NonNullable => valid_elems_buffer.aligned(alignment),
548 Validity::AllInvalid => ByteBuffer::zeroed_aligned(n_rows * byte_width, alignment),
549 Validity::Array(is_valid) => {
550 #[expect(deprecated)]
551 let bool_array = is_valid.to_bool();
552 let bool_buffer = bool_array.to_bit_buffer();
553 let mut bytes = ByteBufferMut::zeroed_aligned(n_rows * byte_width, alignment);
554 for (i, valid_i) in bool_buffer.set_indices().enumerate() {
555 bytes[valid_i * byte_width..(valid_i + 1) * byte_width]
556 .copy_from_slice(&valid_elems_buffer[i * byte_width..(i + 1) * byte_width])
557 }
558 bytes.freeze()
559 }
560 };
561
562 Self::from_byte_buffer(buffer, ptype, validity)
563 }
564
565 pub fn to_buffer<T: NativePType>(&self) -> Buffer<T> {
569 if T::PTYPE != self.ptype() {
570 vortex_panic!(
571 "Attempted to get buffer of type {} from array of type {}",
572 T::PTYPE,
573 self.ptype()
574 )
575 }
576 Buffer::from_byte_buffer(self.buffer_handle().to_host_sync())
577 }
578
579 pub fn into_buffer<T: NativePType>(self) -> Buffer<T> {
581 if T::PTYPE != self.ptype() {
582 vortex_panic!(
583 "Attempted to get buffer of type {} from array of type {}",
584 T::PTYPE,
585 self.ptype()
586 )
587 }
588 Buffer::from_byte_buffer(self.buffer.into_host_sync())
589 }
590
591 pub fn into_buffer_mut<T: NativePType>(self) -> BufferMut<T> {
594 self.try_into_buffer_mut()
595 .unwrap_or_else(|buffer| BufferMut::<T>::copy_from(&buffer))
596 }
597
598 pub fn try_into_buffer_mut<T: NativePType>(self) -> Result<BufferMut<T>, Buffer<T>> {
600 if T::PTYPE != self.ptype() {
601 vortex_panic!(
602 "Attempted to get buffer_mut of type {} from array of type {}",
603 T::PTYPE,
604 self.ptype()
605 )
606 }
607 let buffer = Buffer::<T>::from_byte_buffer(self.buffer.into_host_sync());
608 buffer.try_into_mut()
609 }
610}