1use std::any::type_name;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::hash::Hash;
8use std::hash::Hasher;
9use std::ops::Range;
10use std::sync::Arc;
11
12use vortex_buffer::ByteBuffer;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_error::vortex_ensure;
16use vortex_error::vortex_err;
17use vortex_error::vortex_panic;
18use vortex_mask::Mask;
19
20use crate::AnyCanonical;
21use crate::Array;
22use crate::ArrayEq;
23use crate::ArrayHash;
24use crate::ArrayView;
25use crate::Canonical;
26use crate::ExecutionCtx;
27use crate::ExecutionResult;
28use crate::IntoArray;
29use crate::LEGACY_SESSION;
30use crate::VTable;
31use crate::VortexSessionExecute;
32use crate::aggregate_fn::fns::sum::sum;
33use crate::array::ArrayData;
34use crate::array::ArrayId;
35use crate::array::ArrayInner;
36use crate::array::ArraySlots;
37use crate::array::DynArrayData;
38use crate::arrays::Bool;
39use crate::arrays::Constant;
40use crate::arrays::DictArray;
41use crate::arrays::FilterArray;
42use crate::arrays::Null;
43use crate::arrays::Primitive;
44use crate::arrays::SliceArray;
45use crate::arrays::VarBin;
46use crate::arrays::VarBinView;
47use crate::buffer::BufferHandle;
48use crate::builders::ArrayBuilder;
49use crate::dtype::DType;
50use crate::expr::stats::Precision;
51use crate::expr::stats::Stat;
52use crate::expr::stats::StatsProviderExt;
53use crate::matcher::Matcher;
54use crate::optimizer::ArrayOptimizer;
55use crate::scalar::Scalar;
56use crate::scalar::ScalarValue;
57use crate::stats::StatsSetRef;
58use crate::validity::Validity;
59
60pub struct DepthFirstArrayIterator {
62 stack: Vec<ArrayRef>,
63}
64
65impl Iterator for DepthFirstArrayIterator {
66 type Item = ArrayRef;
67
68 fn next(&mut self) -> Option<Self::Item> {
69 let next = self.stack.pop()?;
70 for child in next.children().into_iter().rev() {
71 self.stack.push(child);
72 }
73 Some(next)
74 }
75}
76
77#[derive(Clone)]
83pub struct ArrayRef(Arc<ArrayInner<dyn DynArrayData>>);
84
85impl ArrayRef {
86 pub(crate) fn from_inner<D: DynArrayData>(inner: Arc<ArrayInner<D>>) -> Self {
88 Self(inner)
89 }
90
91 #[inline(always)]
93 pub(crate) fn dyn_array(&self) -> &dyn DynArrayData {
94 &self.0.data
95 }
96
97 #[inline(always)]
99 pub(crate) fn inner_mut(&mut self) -> Option<&mut ArrayInner<dyn DynArrayData>> {
100 Arc::get_mut(&mut self.0)
101 }
102
103 #[doc(hidden)]
106 pub fn addr(&self) -> usize {
107 Arc::as_ptr(&self.0).addr()
108 }
109
110 #[allow(dead_code)]
114 pub(crate) fn downcast_inner<V: VTable>(self) -> Result<Arc<ArrayInner<ArrayData<V>>>, Self> {
115 if self.0.data.as_any().is::<ArrayData<V>>() {
117 Ok(unsafe { self.downcast_inner_unchecked() })
118 } else {
119 Err(self)
120 }
121 }
122
123 #[inline(always)]
128 pub(crate) unsafe fn downcast_inner_unchecked<V: VTable>(
129 self,
130 ) -> Arc<ArrayInner<ArrayData<V>>> {
131 debug_assert!(self.0.data.as_any().is::<ArrayData<V>>());
132 let raw = Arc::into_raw(self.0);
136 unsafe { Arc::from_raw(raw.cast::<ArrayInner<ArrayData<V>>>()) }
138 }
139
140 pub fn ptr_eq(this: &ArrayRef, other: &ArrayRef) -> bool {
142 Arc::ptr_eq(&this.0, &other.0)
143 }
144}
145
146impl Debug for ArrayRef {
147 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
148 f.debug_struct("Array")
149 .field("encoding", &self.0.encoding_id)
150 .field("dtype", &self.0.dtype)
151 .field("len", &self.0.len)
152 .field("data", &self.0.data)
153 .finish()
154 }
155}
156
157impl ArrayHash for ArrayRef {
158 fn array_hash<H: Hasher>(&self, state: &mut H, accuracy: crate::EqMode) {
159 self.0.len.hash(state);
160 self.0.dtype.hash(state);
161 self.0.encoding_id.hash(state);
162 self.0.slots.len().hash(state);
163 for slot in &self.0.slots {
164 slot.array_hash(state, accuracy);
165 }
166 self.0
167 .data
168 .dyn_array_hash(state as &mut dyn Hasher, accuracy);
169 }
170}
171
172impl ArrayEq for ArrayRef {
173 fn array_eq(&self, other: &Self, accuracy: crate::EqMode) -> bool {
174 self.0.len == other.0.len
175 && self.0.dtype == other.0.dtype
176 && self.0.encoding_id == other.0.encoding_id
177 && self.0.slots.len() == other.0.slots.len()
178 && self
179 .0
180 .slots
181 .iter()
182 .zip(other.0.slots.iter())
183 .all(|(slot, other_slot)| slot.array_eq(other_slot, accuracy))
184 && self.0.data.dyn_array_eq(other, accuracy)
185 }
186}
187impl ArrayRef {
188 #[inline]
190 pub fn len(&self) -> usize {
191 self.0.len
192 }
193
194 #[inline]
196 pub fn is_empty(&self) -> bool {
197 self.0.len == 0
198 }
199
200 #[inline]
202 pub fn dtype(&self) -> &DType {
203 &self.0.dtype
204 }
205
206 #[inline]
208 pub fn encoding_id(&self) -> ArrayId {
209 self.0.encoding_id
210 }
211
212 pub fn slice(&self, range: Range<usize>) -> VortexResult<ArrayRef> {
214 let len = self.len();
215 let start = range.start;
216 let stop = range.end;
217
218 if start == 0 && stop == len {
219 return Ok(self.clone());
220 }
221
222 vortex_ensure!(start <= len, "OutOfBounds: start {start} > length {}", len);
223 vortex_ensure!(stop <= len, "OutOfBounds: stop {stop} > length {}", len);
224
225 vortex_ensure!(start <= stop, "start ({start}) must be <= stop ({stop})");
226
227 if start == stop {
228 return Ok(Canonical::empty(self.dtype()).into_array());
229 }
230
231 let sliced = SliceArray::try_new(self.clone(), range)?
232 .into_array()
233 .optimize()?;
234
235 if !sliced.is::<Constant>() {
237 self.statistics().with_iter(|iter| {
238 sliced.statistics().inherit(iter.filter(|(stat, value)| {
239 matches!(
240 stat,
241 Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted
242 ) && value
243 .as_ref()
244 .as_exact()
245 .is_some_and(|v| matches!(v, ScalarValue::Bool(true)))
246 }));
247 });
248 }
249
250 Ok(sliced)
251 }
252
253 pub fn filter(&self, mask: Mask) -> VortexResult<ArrayRef> {
255 FilterArray::try_new(self.clone(), mask)?
256 .into_array()
257 .optimize()
258 }
259
260 pub fn take(&self, indices: ArrayRef) -> VortexResult<ArrayRef> {
262 DictArray::try_new(indices, self.clone())?
263 .into_array()
264 .optimize()
265 }
266
267 #[deprecated(
269 note = "Use `execute_scalar` instead, which allows passing an execution context for more \
270 efficient execution when fetching multiple scalars from the same array."
271 )]
272 pub fn scalar_at(&self, index: usize) -> VortexResult<Scalar> {
273 self.execute_scalar(index, &mut LEGACY_SESSION.create_execution_ctx())
274 }
275
276 pub fn execute_scalar(&self, index: usize, ctx: &mut ExecutionCtx) -> VortexResult<Scalar> {
278 vortex_ensure!(index < self.len(), OutOfBounds: index, 0, self.len());
279 if self.dtype().is_nullable() && self.is_invalid(index, ctx)? {
280 return Ok(Scalar::null(self.dtype().clone()));
281 }
282 let scalar = self.0.data.execute_scalar(self, index, ctx)?;
283 debug_assert_eq!(self.dtype(), scalar.dtype(), "Scalar dtype mismatch");
284 Ok(scalar)
285 }
286
287 pub fn is_valid(&self, index: usize, ctx: &mut ExecutionCtx) -> VortexResult<bool> {
289 vortex_ensure!(index < self.len(), OutOfBounds: index, 0, self.len());
290 match self.validity()? {
291 Validity::NonNullable | Validity::AllValid => Ok(true),
292 Validity::AllInvalid => Ok(false),
293 Validity::Array(a) => a
294 .execute_scalar(index, ctx)?
295 .as_bool()
296 .value()
297 .ok_or_else(|| vortex_err!("validity value at index {} is null", index)),
298 }
299 }
300
301 pub fn is_invalid(&self, index: usize, ctx: &mut ExecutionCtx) -> VortexResult<bool> {
303 Ok(!self.is_valid(index, ctx)?)
304 }
305
306 pub fn all_valid(&self, ctx: &mut ExecutionCtx) -> VortexResult<bool> {
308 match self.validity()? {
309 Validity::NonNullable | Validity::AllValid => Ok(true),
310 Validity::AllInvalid => Ok(false),
311 Validity::Array(a) => Ok(a.statistics().compute_min::<bool>(ctx).unwrap_or(false)),
312 }
313 }
314
315 pub fn all_invalid(&self, ctx: &mut ExecutionCtx) -> VortexResult<bool> {
317 match self.validity()? {
318 Validity::NonNullable | Validity::AllValid => Ok(false),
319 Validity::AllInvalid => Ok(true),
320 Validity::Array(a) => Ok(!a.statistics().compute_max::<bool>(ctx).unwrap_or(true)),
321 }
322 }
323
324 pub fn valid_count(&self, ctx: &mut ExecutionCtx) -> VortexResult<usize> {
326 let len = self.len();
327 if let Precision::Exact(invalid_count) = self.statistics().get_as::<usize>(Stat::NullCount)
328 {
329 return Ok(len - invalid_count);
330 }
331
332 let count = match self.validity()? {
333 Validity::NonNullable | Validity::AllValid => len,
334 Validity::AllInvalid => 0,
335 Validity::Array(a) => {
336 let array_sum = sum(&a, ctx)?;
337 array_sum
338 .as_primitive()
339 .as_::<usize>()
340 .ok_or_else(|| vortex_err!("sum of validity array is null"))?
341 }
342 };
343 vortex_ensure!(count <= len, "Valid count exceeds array length");
344
345 self.statistics()
346 .set(Stat::NullCount, Precision::exact(len - count));
347
348 Ok(count)
349 }
350
351 pub fn invalid_count(&self, ctx: &mut ExecutionCtx) -> VortexResult<usize> {
353 Ok(self.len() - self.valid_count(ctx)?)
354 }
355
356 pub fn validity(&self) -> VortexResult<Validity> {
358 self.0.data.validity(self)
359 }
360
361 #[deprecated(note = "use `array.execute::<Canonical>(ctx)` instead")]
363 pub fn into_canonical(self) -> VortexResult<Canonical> {
364 self.execute(&mut LEGACY_SESSION.create_execution_ctx())
365 }
366
367 #[deprecated(note = "use `array.execute::<Canonical>(ctx)` instead")]
369 pub fn to_canonical(&self) -> VortexResult<Canonical> {
370 #[expect(deprecated)]
371 let result = self.clone().into_canonical();
372 result
373 }
374
375 pub fn append_to_builder(
377 &self,
378 builder: &mut dyn ArrayBuilder,
379 ctx: &mut ExecutionCtx,
380 ) -> VortexResult<()> {
381 self.0.data.append_to_builder(self, builder, ctx)
382 }
383
384 pub fn statistics(&self) -> StatsSetRef<'_> {
386 self.0.stats.to_ref(self)
387 }
388
389 #[inline]
391 pub fn is<M: Matcher>(&self) -> bool {
392 M::matches(self)
393 }
394
395 #[inline]
397 pub fn as_<M: Matcher>(&self) -> M::Match<'_> {
398 self.as_opt::<M>().vortex_expect("Failed to downcast")
399 }
400
401 #[inline]
403 pub fn as_opt<M: Matcher>(&self) -> Option<M::Match<'_>> {
404 M::try_match(self)
405 }
406
407 pub fn try_downcast<V: VTable>(self) -> Result<Array<V>, ArrayRef> {
409 Array::<V>::try_from_array_ref(self)
410 }
411
412 pub fn downcast<V: VTable>(self) -> Array<V> {
418 Self::try_downcast(self)
419 .unwrap_or_else(|_| vortex_panic!("Failed to downcast to {}", type_name::<V>()))
420 }
421
422 pub fn as_typed<V: VTable>(&self) -> Option<ArrayView<'_, V>> {
424 let inner = self.0.data.as_any().downcast_ref::<ArrayData<V>>()?;
425 Some(unsafe { ArrayView::new_unchecked(self, &inner.data) })
426 }
427
428 pub fn as_constant(&self) -> Option<Scalar> {
430 self.as_opt::<Constant>().map(|a| a.scalar().clone())
431 }
432
433 pub fn nbytes(&self) -> u64 {
435 let mut nbytes = 0;
436 for array in self.depth_first_traversal() {
437 for buffer in array.buffers() {
438 nbytes += buffer.len() as u64;
439 }
440 }
441 nbytes
442 }
443
444 pub fn is_arrow(&self) -> bool {
446 self.is::<Null>()
447 || self.is::<Bool>()
448 || self.is::<Primitive>()
449 || self.is::<VarBin>()
450 || self.is::<VarBinView>()
451 }
452
453 pub fn is_canonical(&self) -> bool {
455 self.is::<AnyCanonical>()
456 }
457
458 pub unsafe fn with_slot(
471 self,
472 slot_idx: usize,
473 replacement: ArrayRef,
474 ) -> VortexResult<ArrayRef> {
475 let mut slots: ArraySlots = self.slots().iter().cloned().collect();
476 let nslots = slots.len();
477 vortex_ensure!(
478 slot_idx < nslots,
479 "slot index {} out of bounds for array with {} slots",
480 slot_idx,
481 nslots
482 );
483 let existing = slots[slot_idx]
484 .as_ref()
485 .vortex_expect("with_slot cannot replace an absent slot");
486 vortex_ensure!(
487 existing.dtype() == replacement.dtype(),
488 "slot {} dtype changed from {} to {} during physical rewrite",
489 slot_idx,
490 existing.dtype(),
491 replacement.dtype()
492 );
493 vortex_ensure!(
494 existing.len() == replacement.len(),
495 "slot {} len changed from {} to {} during physical rewrite",
496 slot_idx,
497 existing.len(),
498 replacement.len()
499 );
500 slots[slot_idx] = Some(replacement);
501 unsafe { self.with_slots(slots) }
503 }
504
505 pub(crate) unsafe fn take_slot_unchecked(
517 mut self,
518 slot_idx: usize,
519 ) -> VortexResult<(ArrayRef, ArrayRef)> {
520 if let Some(inner) = Arc::get_mut(&mut self.0) {
521 let child = inner.slots[slot_idx]
522 .take()
523 .vortex_expect("take_slot_unchecked cannot take an absent slot");
524 return Ok((self, child));
525 }
526
527 let child = self.slots()[slot_idx]
530 .as_ref()
531 .vortex_expect("take_slot_unchecked cannot take an absent slot")
532 .clone();
533
534 let mut new_slots: ArraySlots = self.slots().iter().cloned().collect();
535 new_slots[slot_idx] = None;
536
537 let new_parent = unsafe { self.0.data.with_slots_unchecked(&self, new_slots) };
540 Ok((new_parent, child))
541 }
542
543 pub(crate) unsafe fn put_slot_unchecked(
551 mut self,
552 slot_idx: usize,
553 replacement: ArrayRef,
554 ) -> VortexResult<ArrayRef> {
555 if let Some(inner) = Arc::get_mut(&mut self.0) {
556 inner.slots[slot_idx] = Some(replacement);
557 return Ok(self);
558 }
559
560 let mut slots: ArraySlots = self.slots().iter().cloned().collect();
561 slots[slot_idx] = Some(replacement);
562 self.0.data.with_slots(&self, slots)
563 }
564
565 pub unsafe fn with_slots(self, slots: ArraySlots) -> VortexResult<ArrayRef> {
576 let old_slots = self.slots();
577 vortex_ensure!(
578 old_slots.len() == slots.len(),
579 "slot count changed from {} to {} during physical rewrite",
580 old_slots.len(),
581 slots.len()
582 );
583 for (idx, (old_slot, new_slot)) in old_slots.iter().zip(slots.iter()).enumerate() {
584 vortex_ensure!(
585 old_slot.is_some() == new_slot.is_some(),
586 "slot {} presence changed during physical rewrite",
587 idx
588 );
589 if let (Some(old_slot), Some(new_slot)) = (old_slot.as_ref(), new_slot.as_ref()) {
590 vortex_ensure!(
591 old_slot.dtype() == new_slot.dtype(),
592 "slot {} dtype changed from {} to {} during physical rewrite",
593 idx,
594 old_slot.dtype(),
595 new_slot.dtype()
596 );
597 vortex_ensure!(
598 old_slot.len() == new_slot.len(),
599 "slot {} len changed from {} to {} during physical rewrite",
600 idx,
601 old_slot.len(),
602 new_slot.len()
603 );
604 }
605 }
606 self.0.data.with_slots(&self, slots)
607 }
608
609 pub unsafe fn with_buffers(
622 self,
623 buffers: impl IntoIterator<Item = BufferHandle>,
624 ) -> VortexResult<ArrayRef> {
625 let buffers = buffers.into_iter().collect::<Vec<_>>();
626 let nbuffers = self.nbuffers();
627 vortex_ensure!(
628 nbuffers == buffers.len(),
629 "buffer count changed from {} to {} during physical rewrite",
630 nbuffers,
631 buffers.len()
632 );
633 for (idx, (old_buffer, new_buffer)) in self
634 .buffer_handles()
635 .into_iter()
636 .zip(buffers.iter())
637 .enumerate()
638 {
639 vortex_ensure!(
640 old_buffer.len() == new_buffer.len(),
641 "buffer {} length changed from {} to {} during physical rewrite",
642 idx,
643 old_buffer.len(),
644 new_buffer.len()
645 );
646 }
647 self.0.data.with_buffers(&self, buffers)
648 }
649
650 pub fn reduce(&self) -> VortexResult<Option<ArrayRef>> {
651 self.0.data.reduce(self)
652 }
653
654 pub fn reduce_parent(
655 &self,
656 parent: &ArrayRef,
657 child_idx: usize,
658 ) -> VortexResult<Option<ArrayRef>> {
659 self.0.data.reduce_parent(self, parent, child_idx)
660 }
661
662 pub(crate) fn execute_encoding(self, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
663 let inner = Arc::as_ptr(&self.0);
664 unsafe { (&*inner).data.execute(self, ctx) }
666 }
667
668 pub(crate) fn execute_encoding_unchecked(
674 self,
675 ctx: &mut ExecutionCtx,
676 ) -> VortexResult<ExecutionResult> {
677 let inner = Arc::as_ptr(&self.0);
678 unsafe { (&*inner).data.execute_unchecked(self, ctx) }
682 }
683
684 pub fn children(&self) -> Vec<ArrayRef> {
688 self.0.data.children(self)
689 }
690
691 pub fn nchildren(&self) -> usize {
693 self.0.data.nchildren(self)
694 }
695
696 pub fn nth_child(&self, idx: usize) -> Option<ArrayRef> {
698 self.0.data.nth_child(self, idx)
699 }
700
701 pub fn children_names(&self) -> Vec<String> {
703 self.0.data.children_names(self)
704 }
705
706 pub fn named_children(&self) -> Vec<(String, ArrayRef)> {
708 self.0.data.named_children(self)
709 }
710
711 pub fn buffers(&self) -> Vec<ByteBuffer> {
713 self.0.data.buffers(self)
714 }
715
716 pub fn buffer_handles(&self) -> Vec<BufferHandle> {
718 self.0.data.buffer_handles(self)
719 }
720
721 pub fn buffer_names(&self) -> Vec<String> {
723 self.0.data.buffer_names(self)
724 }
725
726 pub fn named_buffers(&self) -> Vec<(String, BufferHandle)> {
728 self.0.data.named_buffers(self)
729 }
730
731 pub fn nbuffers(&self) -> usize {
733 self.0.data.nbuffers(self)
734 }
735
736 pub fn slots(&self) -> &[Option<ArrayRef>] {
738 &self.0.slots
739 }
740
741 pub fn slot_name(&self, idx: usize) -> String {
743 self.0.data.slot_name(self, idx)
744 }
745
746 pub fn metadata_fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
748 self.0.data.metadata_fmt(f)
749 }
750
751 pub fn is_host(&self) -> bool {
753 for array in self.depth_first_traversal() {
754 if !array.buffer_handles().iter().all(BufferHandle::is_on_host) {
755 return false;
756 }
757 }
758 true
759 }
760
761 pub fn nbuffers_recursive(&self) -> usize {
765 self.children()
766 .iter()
767 .map(|c| c.nbuffers_recursive())
768 .sum::<usize>()
769 + self.nbuffers()
770 }
771
772 pub fn depth_first_traversal(&self) -> DepthFirstArrayIterator {
774 DepthFirstArrayIterator {
775 stack: vec![self.clone()],
776 }
777 }
778}
779
780impl IntoArray for ArrayRef {
781 #[inline(always)]
782 fn into_array(self) -> ArrayRef {
783 self
784 }
785}
786
787impl<V: VTable> Matcher for V {
788 type Match<'a> = ArrayView<'a, V>;
789
790 #[inline]
791 fn matches(array: &ArrayRef) -> bool {
792 array.0.data.as_any().is::<ArrayData<V>>()
793 }
794
795 #[inline]
796 fn try_match(array: &'_ ArrayRef) -> Option<ArrayView<'_, V>> {
797 let inner = array.0.data.as_any().downcast_ref::<ArrayData<V>>()?;
798 Some(unsafe { ArrayView::new_unchecked(array, &inner.data) })
800 }
801}