1use std::fmt::Display;
5use std::fmt::Formatter;
6use std::iter;
7
8use vortex_buffer::Alignment;
9use vortex_buffer::Buffer;
10use vortex_buffer::BufferMut;
11use vortex_buffer::ByteBuffer;
12use vortex_buffer::ByteBufferMut;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_error::vortex_err;
16use vortex_error::vortex_panic;
17
18use crate::LEGACY_SESSION;
19#[expect(deprecated)]
20use crate::ToCanonical as _;
21use crate::VortexSessionExecute;
22use crate::array::Array;
23use crate::array::ArrayParts;
24use crate::array::TypedArrayRef;
25use crate::arrays::Primitive;
26use crate::arrays::PrimitiveArray;
27use crate::dtype::DType;
28use crate::dtype::NativePType;
29use crate::dtype::Nullability;
30use crate::dtype::PType;
31use crate::match_each_native_ptype;
32use crate::validity::Validity;
33
34mod accessor;
35mod cast;
36mod conversion;
37mod patch;
38mod top_value;
39
40pub use patch::chunk_range;
41pub use patch::patch_chunk;
42
43use crate::ArrayRef;
44use crate::aggregate_fn::fns::min_max::min_max;
45use crate::array::child_to_validity;
46use crate::array::validity_to_child;
47use crate::arrays::bool::BoolArrayExt;
48use crate::buffer::BufferHandle;
49use crate::builtins::ArrayBuiltins;
50
51pub(super) const VALIDITY_SLOT: usize = 0;
53pub(super) const NUM_SLOTS: usize = 1;
54pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["validity"];
55
56#[derive(Clone, Debug)]
89pub struct PrimitiveData {
90 pub(super) ptype: PType,
91 pub(super) buffer: BufferHandle,
92}
93
94impl Display for PrimitiveData {
95 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
96 write!(f, "ptype: {}", self.ptype)
97 }
98}
99
100pub struct PrimitiveDataParts {
101 pub ptype: PType,
102 pub buffer: BufferHandle,
103 pub validity: Validity,
104}
105
106pub trait PrimitiveArrayExt: TypedArrayRef<Primitive> {
107 fn ptype(&self) -> PType {
108 match self.as_ref().dtype() {
109 DType::Primitive(ptype, _) => *ptype,
110 _ => unreachable!("PrimitiveArrayExt requires a primitive dtype"),
111 }
112 }
113
114 fn nullability(&self) -> Nullability {
115 match self.as_ref().dtype() {
116 DType::Primitive(_, nullability) => *nullability,
117 _ => unreachable!("PrimitiveArrayExt requires a primitive dtype"),
118 }
119 }
120
121 fn validity_child(&self) -> Option<&ArrayRef> {
122 self.as_ref().slots()[VALIDITY_SLOT].as_ref()
123 }
124
125 fn validity(&self) -> Validity {
126 child_to_validity(
127 self.as_ref().slots()[VALIDITY_SLOT].as_ref(),
128 self.nullability(),
129 )
130 }
131
132 fn buffer_handle(&self) -> &BufferHandle {
133 &self.buffer
134 }
135
136 fn reinterpret_cast(&self, ptype: PType) -> PrimitiveArray {
137 if self.ptype() == ptype {
138 return self.to_owned();
139 }
140
141 assert_eq!(
142 self.ptype().byte_width(),
143 ptype.byte_width(),
144 "can't reinterpret cast between integers of two different widths"
145 );
146
147 PrimitiveArray::from_buffer_handle(self.buffer_handle().clone(), ptype, self.validity())
148 }
149
150 fn narrow(&self) -> VortexResult<PrimitiveArray> {
152 if !self.ptype().is_int() {
153 return Ok(self.to_owned());
154 }
155
156 let mut ctx = LEGACY_SESSION.create_execution_ctx();
157 let Some(min_max) = min_max(self.as_ref(), &mut ctx)? else {
158 return Ok(PrimitiveArray::new(
159 Buffer::<u8>::zeroed(self.len()),
160 self.validity(),
161 ));
162 };
163
164 let Ok(min) = min_max
167 .min
168 .cast(&PType::I64.into())
169 .and_then(|s| i64::try_from(&s))
170 else {
171 return Ok(self.to_owned());
172 };
173 let Ok(max) = min_max
174 .max
175 .cast(&PType::I64.into())
176 .and_then(|s| i64::try_from(&s))
177 else {
178 return Ok(self.to_owned());
179 };
180
181 let nullability = self.as_ref().dtype().nullability();
182
183 if min < 0 || max < 0 {
184 if min >= i8::MIN as i64 && max <= i8::MAX as i64 {
186 #[expect(deprecated)]
187 let result = self
188 .as_ref()
189 .cast(DType::Primitive(PType::I8, nullability))?
190 .to_primitive();
191 return Ok(result);
192 }
193
194 if min >= i16::MIN as i64 && max <= i16::MAX as i64 {
195 #[expect(deprecated)]
196 let result = self
197 .as_ref()
198 .cast(DType::Primitive(PType::I16, nullability))?
199 .to_primitive();
200 return Ok(result);
201 }
202
203 if min >= i32::MIN as i64 && max <= i32::MAX as i64 {
204 #[expect(deprecated)]
205 let result = self
206 .as_ref()
207 .cast(DType::Primitive(PType::I32, nullability))?
208 .to_primitive();
209 return Ok(result);
210 }
211 } else {
212 if max <= u8::MAX as i64 {
214 #[expect(deprecated)]
215 let result = self
216 .as_ref()
217 .cast(DType::Primitive(PType::U8, nullability))?
218 .to_primitive();
219 return Ok(result);
220 }
221
222 if max <= u16::MAX as i64 {
223 #[expect(deprecated)]
224 let result = self
225 .as_ref()
226 .cast(DType::Primitive(PType::U16, nullability))?
227 .to_primitive();
228 return Ok(result);
229 }
230
231 if max <= u32::MAX as i64 {
232 #[expect(deprecated)]
233 let result = self
234 .as_ref()
235 .cast(DType::Primitive(PType::U32, nullability))?
236 .to_primitive();
237 return Ok(result);
238 }
239 }
240
241 Ok(self.to_owned())
242 }
243}
244impl<T: TypedArrayRef<Primitive>> PrimitiveArrayExt for T {}
245
246impl PrimitiveData {
248 pub(super) fn make_slots(validity: &Validity, len: usize) -> Vec<Option<ArrayRef>> {
250 vec![validity_to_child(validity, len)]
251 }
252
253 pub unsafe fn new_unchecked_from_handle(
260 handle: BufferHandle,
261 ptype: PType,
262 _validity: Validity,
263 ) -> Self {
264 Self {
265 ptype,
266 buffer: handle,
267 }
268 }
269
270 pub fn new<T: NativePType>(buffer: impl Into<Buffer<T>>, validity: Validity) -> Self {
277 let buffer = buffer.into();
278 Self::try_new(buffer, validity).vortex_expect("PrimitiveArray construction failed")
279 }
280
281 #[inline]
290 pub fn try_new<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> VortexResult<Self> {
291 Self::validate(&buffer, &validity)?;
292
293 Ok(unsafe { Self::new_unchecked(buffer, validity) })
295 }
296
297 #[inline]
310 pub unsafe fn new_unchecked<T: NativePType>(buffer: Buffer<T>, _validity: Validity) -> Self {
311 #[cfg(debug_assertions)]
312 Self::validate(&buffer, &_validity)
313 .vortex_expect("[Debug Assertion]: Invalid `PrimitiveArray` parameters");
314
315 Self {
316 ptype: T::PTYPE,
317 buffer: BufferHandle::new_host(buffer.into_byte_buffer()),
318 }
319 }
320
321 #[inline]
325 pub fn validate<T: NativePType>(buffer: &Buffer<T>, validity: &Validity) -> VortexResult<()> {
326 if let Some(len) = validity.maybe_len()
327 && buffer.len() != len
328 {
329 return Err(vortex_err!(
330 InvalidArgument:
331 "Buffer and validity length mismatch: buffer={}, validity={}",
332 buffer.len(),
333 len
334 ));
335 }
336 Ok(())
337 }
338
339 pub fn empty<T: NativePType>(nullability: Nullability) -> Self {
340 Self::new(Buffer::<T>::empty(), nullability.into())
341 }
342}
343
344impl Array<Primitive> {
345 pub fn empty<T: NativePType>(nullability: Nullability) -> Self {
346 let dtype = DType::Primitive(T::PTYPE, nullability);
347 let len = 0;
348 let data = PrimitiveData::empty::<T>(nullability);
349 let slots = PrimitiveData::make_slots(&Validity::from(nullability), len);
350 unsafe {
351 Array::from_parts_unchecked(
352 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
353 )
354 }
355 }
356
357 pub fn new<T: NativePType>(buffer: impl Into<Buffer<T>>, validity: Validity) -> Self {
363 let buffer = buffer.into();
364 let dtype = DType::Primitive(T::PTYPE, validity.nullability());
365 let len = buffer.len();
366 let slots = PrimitiveData::make_slots(&validity, len);
367 let data = PrimitiveData::new(buffer, validity);
368 unsafe {
369 Array::from_parts_unchecked(
370 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
371 )
372 }
373 }
374
375 pub fn try_new<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> VortexResult<Self> {
377 let dtype = DType::Primitive(T::PTYPE, validity.nullability());
378 let len = buffer.len();
379 let slots = PrimitiveData::make_slots(&validity, len);
380 let data = PrimitiveData::try_new(buffer, validity)?;
381 Ok(unsafe {
382 Array::from_parts_unchecked(
383 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
384 )
385 })
386 }
387
388 pub unsafe fn new_unchecked<T: NativePType>(buffer: Buffer<T>, validity: Validity) -> Self {
394 let dtype = DType::Primitive(T::PTYPE, validity.nullability());
395 let len = buffer.len();
396 let slots = PrimitiveData::make_slots(&validity, len);
397 let data = unsafe { PrimitiveData::new_unchecked(buffer, validity) };
398 unsafe {
399 Array::from_parts_unchecked(
400 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
401 )
402 }
403 }
404
405 pub unsafe fn new_unchecked_from_handle(
411 handle: BufferHandle,
412 ptype: PType,
413 validity: Validity,
414 ) -> Self {
415 let dtype = DType::Primitive(ptype, validity.nullability());
416 let len = handle.len() / ptype.byte_width();
417 let slots = PrimitiveData::make_slots(&validity, len);
418 let data = unsafe { PrimitiveData::new_unchecked_from_handle(handle, ptype, validity) };
419 unsafe {
420 Array::from_parts_unchecked(
421 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
422 )
423 }
424 }
425
426 pub fn from_buffer_handle(handle: BufferHandle, ptype: PType, validity: Validity) -> Self {
428 let dtype = DType::Primitive(ptype, validity.nullability());
429 let len = handle.len() / ptype.byte_width();
430 let slots = PrimitiveData::make_slots(&validity, len);
431 let data = PrimitiveData::from_buffer_handle(handle, ptype, validity);
432 Array::try_from_parts(ArrayParts::new(Primitive, dtype, len, data).with_slots(slots))
433 .vortex_expect("PrimitiveData is always valid")
434 }
435
436 pub fn from_byte_buffer(buffer: ByteBuffer, ptype: PType, validity: Validity) -> Self {
438 let dtype = DType::Primitive(ptype, validity.nullability());
439 let len = buffer.len() / ptype.byte_width();
440 let slots = PrimitiveData::make_slots(&validity, len);
441 let data = PrimitiveData::from_byte_buffer(buffer, ptype, validity);
442 unsafe {
443 Array::from_parts_unchecked(
444 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
445 )
446 }
447 }
448
449 pub fn from_values_byte_buffer(
451 valid_elems_buffer: ByteBuffer,
452 ptype: PType,
453 validity: Validity,
454 n_rows: usize,
455 ) -> Self {
456 let dtype = DType::Primitive(ptype, validity.nullability());
457 let len = n_rows;
458 let slots = PrimitiveData::make_slots(&validity, len);
459 let data =
460 PrimitiveData::from_values_byte_buffer(valid_elems_buffer, ptype, validity, n_rows);
461 unsafe {
462 Array::from_parts_unchecked(
463 ArrayParts::new(Primitive, dtype, len, data).with_slots(slots),
464 )
465 }
466 }
467
468 pub fn validate<T: NativePType>(buffer: &Buffer<T>, validity: &Validity) -> VortexResult<()> {
470 PrimitiveData::validate(buffer, validity)
471 }
472
473 pub fn into_data_parts(self) -> PrimitiveDataParts {
474 let validity = PrimitiveArrayExt::validity(&self);
475 let ptype = PrimitiveArrayExt::ptype(&self);
476 let data = self.into_data();
477 PrimitiveDataParts {
478 ptype,
479 buffer: data.buffer,
480 validity,
481 }
482 }
483
484 pub fn map_each_with_validity<T, R, F>(self, f: F) -> VortexResult<Self>
485 where
486 T: NativePType,
487 R: NativePType,
488 F: FnMut((T, bool)) -> R,
489 {
490 let validity = PrimitiveArrayExt::validity(&self);
491 let data = self.into_data();
492 let buf_iter = data.to_buffer::<T>().into_iter();
493
494 let buffer = match &validity {
495 Validity::NonNullable | Validity::AllValid => {
496 BufferMut::<R>::from_iter(buf_iter.zip(iter::repeat(true)).map(f))
497 }
498 Validity::AllInvalid => {
499 BufferMut::<R>::from_iter(buf_iter.zip(iter::repeat(false)).map(f))
500 }
501 Validity::Array(val) => {
502 #[expect(deprecated)]
503 let val = val.to_bool().into_bit_buffer();
504 BufferMut::<R>::from_iter(buf_iter.zip(val.iter()).map(f))
505 }
506 };
507 Ok(PrimitiveArray::new(buffer.freeze(), validity))
508 }
509}
510
511impl PrimitiveData {
512 pub fn len(&self) -> usize {
513 self.buffer.len() / self.ptype.byte_width()
514 }
515
516 pub fn is_empty(&self) -> bool {
518 self.buffer.is_empty()
519 }
520
521 pub fn ptype(&self) -> PType {
522 self.ptype
523 }
524
525 pub fn buffer_handle(&self) -> &BufferHandle {
527 &self.buffer
528 }
529
530 pub fn from_buffer_handle(handle: BufferHandle, ptype: PType, _validity: Validity) -> Self {
531 Self {
532 ptype,
533 buffer: handle,
534 }
535 }
536
537 pub fn from_byte_buffer(buffer: ByteBuffer, ptype: PType, validity: Validity) -> Self {
538 match_each_native_ptype!(ptype, |T| {
539 Self::new::<T>(Buffer::from_byte_buffer(buffer), validity)
540 })
541 }
542
543 pub fn from_values_byte_buffer(
545 valid_elems_buffer: ByteBuffer,
546 ptype: PType,
547 validity: Validity,
548 n_rows: usize,
549 ) -> Self {
550 let byte_width = ptype.byte_width();
551 let alignment = Alignment::new(byte_width);
552 let buffer = match &validity {
553 Validity::AllValid | Validity::NonNullable => valid_elems_buffer.aligned(alignment),
554 Validity::AllInvalid => ByteBuffer::zeroed_aligned(n_rows * byte_width, alignment),
555 Validity::Array(is_valid) => {
556 #[expect(deprecated)]
557 let bool_array = is_valid.to_bool();
558 let bool_buffer = bool_array.to_bit_buffer();
559 let mut bytes = ByteBufferMut::zeroed_aligned(n_rows * byte_width, alignment);
560 for (i, valid_i) in bool_buffer.set_indices().enumerate() {
561 bytes[valid_i * byte_width..(valid_i + 1) * byte_width]
562 .copy_from_slice(&valid_elems_buffer[i * byte_width..(i + 1) * byte_width])
563 }
564 bytes.freeze()
565 }
566 };
567
568 Self::from_byte_buffer(buffer, ptype, validity)
569 }
570
571 pub fn to_buffer<T: NativePType>(&self) -> Buffer<T> {
575 if T::PTYPE != self.ptype() {
576 vortex_panic!(
577 "Attempted to get buffer of type {} from array of type {}",
578 T::PTYPE,
579 self.ptype()
580 )
581 }
582 Buffer::from_byte_buffer(self.buffer_handle().to_host_sync())
583 }
584
585 pub fn into_buffer<T: NativePType>(self) -> Buffer<T> {
587 if T::PTYPE != self.ptype() {
588 vortex_panic!(
589 "Attempted to get buffer of type {} from array of type {}",
590 T::PTYPE,
591 self.ptype()
592 )
593 }
594 Buffer::from_byte_buffer(self.buffer.into_host_sync())
595 }
596
597 pub fn into_buffer_mut<T: NativePType>(self) -> BufferMut<T> {
600 self.try_into_buffer_mut()
601 .unwrap_or_else(|buffer| BufferMut::<T>::copy_from(&buffer))
602 }
603
604 pub fn try_into_buffer_mut<T: NativePType>(self) -> Result<BufferMut<T>, Buffer<T>> {
606 if T::PTYPE != self.ptype() {
607 vortex_panic!(
608 "Attempted to get buffer_mut of type {} from array of type {}",
609 T::PTYPE,
610 self.ptype()
611 )
612 }
613 let buffer = Buffer::<T>::from_byte_buffer(self.buffer.into_host_sync());
614 buffer.try_into_mut()
615 }
616}