1use std::any::type_name;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::hash::Hasher;
8use std::ops::Range;
9use std::sync::Arc;
10
11use vortex_buffer::ByteBuffer;
12use vortex_error::VortexExpect;
13use vortex_error::VortexResult;
14use vortex_error::vortex_ensure;
15use vortex_error::vortex_err;
16use vortex_error::vortex_panic;
17use vortex_mask::Mask;
18
19use crate::AnyCanonical;
20use crate::Array;
21use crate::ArrayEq;
22use crate::ArrayHash;
23use crate::ArrayView;
24use crate::Canonical;
25use crate::ExecutionCtx;
26use crate::ExecutionResult;
27use crate::IntoArray;
28use crate::LEGACY_SESSION;
29use crate::VTable;
30use crate::VortexSessionExecute;
31use crate::aggregate_fn::fns::sum::sum;
32use crate::array::ArrayId;
33use crate::array::ArrayInner;
34use crate::array::DynArray;
35use crate::arrays::Bool;
36use crate::arrays::Constant;
37use crate::arrays::DictArray;
38use crate::arrays::FilterArray;
39use crate::arrays::Null;
40use crate::arrays::Primitive;
41use crate::arrays::SliceArray;
42use crate::arrays::VarBin;
43use crate::arrays::VarBinView;
44use crate::buffer::BufferHandle;
45use crate::builders::ArrayBuilder;
46use crate::dtype::DType;
47use crate::dtype::Nullability;
48use crate::expr::stats::Precision;
49use crate::expr::stats::Stat;
50use crate::expr::stats::StatsProviderExt;
51use crate::matcher::Matcher;
52use crate::optimizer::ArrayOptimizer;
53use crate::scalar::Scalar;
54use crate::stats::StatsSetRef;
55use crate::validity::Validity;
56
57pub struct DepthFirstArrayIterator {
59 stack: Vec<ArrayRef>,
60}
61
62impl Iterator for DepthFirstArrayIterator {
63 type Item = ArrayRef;
64
65 fn next(&mut self) -> Option<Self::Item> {
66 let next = self.stack.pop()?;
67 for child in next.children().into_iter().rev() {
68 self.stack.push(child);
69 }
70 Some(next)
71 }
72}
73
74#[derive(Clone)]
76pub struct ArrayRef(Arc<dyn DynArray>);
77
78impl ArrayRef {
79 pub(crate) fn from_inner(inner: Arc<dyn DynArray>) -> Self {
81 Self(inner)
82 }
83
84 #[doc(hidden)]
87 pub fn addr(&self) -> usize {
88 Arc::as_ptr(&self.0).addr()
89 }
90
91 #[inline(always)]
93 pub(crate) fn inner(&self) -> &Arc<dyn DynArray> {
94 &self.0
95 }
96
97 #[inline(always)]
99 pub(crate) fn inner_mut(&mut self) -> &mut Arc<dyn DynArray> {
100 &mut self.0
101 }
102
103 #[inline(always)]
105 pub(crate) fn into_inner(self) -> Arc<dyn DynArray> {
106 self.0
107 }
108
109 pub fn ptr_eq(this: &ArrayRef, other: &ArrayRef) -> bool {
111 Arc::ptr_eq(&this.0, &other.0)
112 }
113}
114
115impl Debug for ArrayRef {
116 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
117 Debug::fmt(&*self.0, f)
118 }
119}
120
121impl ArrayHash for ArrayRef {
122 fn array_hash<H: Hasher>(&self, state: &mut H, precision: crate::Precision) {
123 self.0.dyn_array_hash(state as &mut dyn Hasher, precision);
124 }
125}
126
127impl ArrayEq for ArrayRef {
128 fn array_eq(&self, other: &Self, precision: crate::Precision) -> bool {
129 self.0.dyn_array_eq(other, precision)
130 }
131}
132impl ArrayRef {
133 #[inline]
135 pub fn len(&self) -> usize {
136 self.0.len()
137 }
138
139 #[inline]
141 pub fn is_empty(&self) -> bool {
142 self.0.len() == 0
143 }
144
145 #[inline]
147 pub fn dtype(&self) -> &DType {
148 self.0.dtype()
149 }
150
151 #[inline]
153 pub fn encoding_id(&self) -> ArrayId {
154 self.0.encoding_id()
155 }
156
157 pub fn slice(&self, range: Range<usize>) -> VortexResult<ArrayRef> {
159 let len = self.len();
160 let start = range.start;
161 let stop = range.end;
162
163 if start == 0 && stop == len {
164 return Ok(self.clone());
165 }
166
167 vortex_ensure!(start <= len, "OutOfBounds: start {start} > length {}", len);
168 vortex_ensure!(stop <= len, "OutOfBounds: stop {stop} > length {}", len);
169
170 vortex_ensure!(start <= stop, "start ({start}) must be <= stop ({stop})");
171
172 if start == stop {
173 return Ok(Canonical::empty(self.dtype()).into_array());
174 }
175
176 let sliced = SliceArray::try_new(self.clone(), range)?
177 .into_array()
178 .optimize()?;
179
180 if !sliced.is::<Constant>() {
182 self.statistics().with_iter(|iter| {
183 sliced.statistics().inherit(iter.filter(|(stat, value)| {
184 matches!(
185 stat,
186 Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted
187 ) && value.as_ref().as_exact().is_some_and(|v| {
188 Scalar::try_new(DType::Bool(Nullability::NonNullable), Some(v.clone()))
189 .vortex_expect("A stat that was expected to be a boolean stat was not")
190 .as_bool()
191 .value()
192 .unwrap_or_default()
193 })
194 }));
195 });
196 }
197
198 Ok(sliced)
199 }
200
201 pub fn filter(&self, mask: Mask) -> VortexResult<ArrayRef> {
203 FilterArray::try_new(self.clone(), mask)?
204 .into_array()
205 .optimize()
206 }
207
208 pub fn take(&self, indices: ArrayRef) -> VortexResult<ArrayRef> {
210 DictArray::try_new(indices, self.clone())?
211 .into_array()
212 .optimize()
213 }
214
215 #[deprecated(
217 note = "Use `execute_scalar` instead, which allows passing an execution context for more \
218 efficient execution when fetching multiple scalars from the same array."
219 )]
220 pub fn scalar_at(&self, index: usize) -> VortexResult<Scalar> {
221 self.execute_scalar(index, &mut LEGACY_SESSION.create_execution_ctx())
222 }
223
224 pub fn execute_scalar(&self, index: usize, ctx: &mut ExecutionCtx) -> VortexResult<Scalar> {
226 vortex_ensure!(index < self.len(), OutOfBounds: index, 0, self.len());
227 if self.dtype().is_nullable() && self.is_invalid(index, ctx)? {
228 return Ok(Scalar::null(self.dtype().clone()));
229 }
230 let scalar = self.0.execute_scalar(self, index, ctx)?;
231 debug_assert_eq!(self.dtype(), scalar.dtype(), "Scalar dtype mismatch");
232 Ok(scalar)
233 }
234
235 pub fn is_valid(&self, index: usize, ctx: &mut ExecutionCtx) -> VortexResult<bool> {
237 vortex_ensure!(index < self.len(), OutOfBounds: index, 0, self.len());
238 match self.validity()? {
239 Validity::NonNullable | Validity::AllValid => Ok(true),
240 Validity::AllInvalid => Ok(false),
241 Validity::Array(a) => a
242 .execute_scalar(index, ctx)?
243 .as_bool()
244 .value()
245 .ok_or_else(|| vortex_err!("validity value at index {} is null", index)),
246 }
247 }
248
249 pub fn is_invalid(&self, index: usize, ctx: &mut ExecutionCtx) -> VortexResult<bool> {
251 Ok(!self.is_valid(index, ctx)?)
252 }
253
254 pub fn all_valid(&self, ctx: &mut ExecutionCtx) -> VortexResult<bool> {
256 match self.validity()? {
257 Validity::NonNullable | Validity::AllValid => Ok(true),
258 Validity::AllInvalid => Ok(false),
259 Validity::Array(a) => Ok(a.statistics().compute_min::<bool>(ctx).unwrap_or(false)),
260 }
261 }
262
263 pub fn all_invalid(&self, ctx: &mut ExecutionCtx) -> VortexResult<bool> {
265 match self.validity()? {
266 Validity::NonNullable | Validity::AllValid => Ok(false),
267 Validity::AllInvalid => Ok(true),
268 Validity::Array(a) => Ok(!a.statistics().compute_max::<bool>(ctx).unwrap_or(true)),
269 }
270 }
271
272 pub fn valid_count(&self, ctx: &mut ExecutionCtx) -> VortexResult<usize> {
274 let len = self.len();
275 if let Some(Precision::Exact(invalid_count)) =
276 self.statistics().get_as::<usize>(Stat::NullCount)
277 {
278 return Ok(len - invalid_count);
279 }
280
281 let count = match self.validity()? {
282 Validity::NonNullable | Validity::AllValid => len,
283 Validity::AllInvalid => 0,
284 Validity::Array(a) => {
285 let array_sum = sum(&a, ctx)?;
286 array_sum
287 .as_primitive()
288 .as_::<usize>()
289 .ok_or_else(|| vortex_err!("sum of validity array is null"))?
290 }
291 };
292 vortex_ensure!(count <= len, "Valid count exceeds array length");
293
294 self.statistics()
295 .set(Stat::NullCount, Precision::exact(len - count));
296
297 Ok(count)
298 }
299
300 pub fn invalid_count(&self, ctx: &mut ExecutionCtx) -> VortexResult<usize> {
302 Ok(self.len() - self.valid_count(ctx)?)
303 }
304
305 pub fn validity(&self) -> VortexResult<Validity> {
307 self.0.validity(self)
308 }
309
310 #[deprecated(note = "use `array.execute::<Canonical>(ctx)` instead")]
312 pub fn into_canonical(self) -> VortexResult<Canonical> {
313 self.execute(&mut LEGACY_SESSION.create_execution_ctx())
314 }
315
316 #[deprecated(note = "use `array.execute::<Canonical>(ctx)` instead")]
318 pub fn to_canonical(&self) -> VortexResult<Canonical> {
319 #[expect(deprecated)]
320 let result = self.clone().into_canonical();
321 result
322 }
323
324 pub fn append_to_builder(
326 &self,
327 builder: &mut dyn ArrayBuilder,
328 ctx: &mut ExecutionCtx,
329 ) -> VortexResult<()> {
330 self.0.append_to_builder(self, builder, ctx)
331 }
332
333 pub fn statistics(&self) -> StatsSetRef<'_> {
335 self.0.statistics().to_ref(self)
336 }
337
338 pub fn is<M: Matcher>(&self) -> bool {
340 M::matches(self)
341 }
342
343 pub fn as_<M: Matcher>(&self) -> M::Match<'_> {
345 self.as_opt::<M>().vortex_expect("Failed to downcast")
346 }
347
348 pub fn as_opt<M: Matcher>(&self) -> Option<M::Match<'_>> {
350 M::try_match(self)
351 }
352
353 pub fn try_downcast<V: VTable>(self) -> Result<Array<V>, ArrayRef> {
355 Array::<V>::try_from_array_ref(self)
356 }
357
358 pub fn downcast<V: VTable>(self) -> Array<V> {
364 Self::try_downcast(self)
365 .unwrap_or_else(|_| vortex_panic!("Failed to downcast to {}", type_name::<V>()))
366 }
367
368 pub fn as_typed<V: VTable>(&self) -> Option<ArrayView<'_, V>> {
370 let inner = self.0.as_any().downcast_ref::<ArrayInner<V>>()?;
371 Some(unsafe { ArrayView::new_unchecked(self, &inner.data) })
372 }
373
374 pub fn as_constant(&self) -> Option<Scalar> {
376 self.as_opt::<Constant>().map(|a| a.scalar().clone())
377 }
378
379 pub fn nbytes(&self) -> u64 {
381 let mut nbytes = 0;
382 for array in self.depth_first_traversal() {
383 for buffer in array.buffers() {
384 nbytes += buffer.len() as u64;
385 }
386 }
387 nbytes
388 }
389
390 pub fn is_arrow(&self) -> bool {
392 self.is::<Null>()
393 || self.is::<Bool>()
394 || self.is::<Primitive>()
395 || self.is::<VarBin>()
396 || self.is::<VarBinView>()
397 }
398
399 pub fn is_canonical(&self) -> bool {
401 self.is::<AnyCanonical>()
402 }
403
404 pub fn with_slot(self, slot_idx: usize, replacement: ArrayRef) -> VortexResult<ArrayRef> {
411 let slots = self.slots().to_vec();
412 let nslots = slots.len();
413 vortex_ensure!(
414 slot_idx < nslots,
415 "slot index {} out of bounds for array with {} slots",
416 slot_idx,
417 nslots
418 );
419 let existing = slots[slot_idx]
420 .as_ref()
421 .vortex_expect("with_slot cannot replace an absent slot");
422 vortex_ensure!(
423 existing.dtype() == replacement.dtype(),
424 "slot {} dtype changed from {} to {} during physical rewrite",
425 slot_idx,
426 existing.dtype(),
427 replacement.dtype()
428 );
429 vortex_ensure!(
430 existing.len() == replacement.len(),
431 "slot {} len changed from {} to {} during physical rewrite",
432 slot_idx,
433 existing.len(),
434 replacement.len()
435 );
436 let mut slots = slots;
437 slots[slot_idx] = Some(replacement);
438 self.with_slots(slots)
439 }
440
441 pub(crate) unsafe fn take_slot_unchecked(
453 mut self,
454 slot_idx: usize,
455 ) -> VortexResult<(ArrayRef, ArrayRef)> {
456 if let Some(inner) = Arc::get_mut(&mut self.0) {
457 let child = unsafe { inner.slots_mut()[slot_idx].take() }
459 .vortex_expect("take_slot_unchecked cannot take an absent slot");
460 return Ok((self, child));
461 }
462
463 let child = self.slots()[slot_idx]
466 .as_ref()
467 .vortex_expect("take_slot_unchecked cannot take an absent slot")
468 .clone();
469
470 let mut new_slots = self.slots().to_vec();
471 new_slots[slot_idx] = None;
472
473 let new_parent = unsafe { self.0.with_slots_unchecked(&self, new_slots) };
476 Ok((new_parent, child))
477 }
478
479 pub(crate) unsafe fn put_slot_unchecked(
487 mut self,
488 slot_idx: usize,
489 replacement: ArrayRef,
490 ) -> VortexResult<ArrayRef> {
491 if let Some(inner) = Arc::get_mut(&mut self.0) {
492 unsafe { inner.slots_mut()[slot_idx] = Some(replacement) };
494 return Ok(self);
495 }
496
497 let mut slots = self.slots().to_vec();
498 slots[slot_idx] = Some(replacement);
499 let inner = Arc::clone(&self.0);
500 inner.with_slots(self, slots)
501 }
502
503 pub fn with_slots(self, slots: Vec<Option<ArrayRef>>) -> VortexResult<ArrayRef> {
508 let old_slots = self.slots();
509 vortex_ensure!(
510 old_slots.len() == slots.len(),
511 "slot count changed from {} to {} during physical rewrite",
512 old_slots.len(),
513 slots.len()
514 );
515 for (idx, (old_slot, new_slot)) in old_slots.iter().zip(slots.iter()).enumerate() {
516 vortex_ensure!(
517 old_slot.is_some() == new_slot.is_some(),
518 "slot {} presence changed during physical rewrite",
519 idx
520 );
521 if let (Some(old_slot), Some(new_slot)) = (old_slot.as_ref(), new_slot.as_ref()) {
522 vortex_ensure!(
523 old_slot.dtype() == new_slot.dtype(),
524 "slot {} dtype changed from {} to {} during physical rewrite",
525 idx,
526 old_slot.dtype(),
527 new_slot.dtype()
528 );
529 vortex_ensure!(
530 old_slot.len() == new_slot.len(),
531 "slot {} len changed from {} to {} during physical rewrite",
532 idx,
533 old_slot.len(),
534 new_slot.len()
535 );
536 }
537 }
538 let inner = Arc::clone(&self.0);
539 inner.with_slots(self, slots)
540 }
541
542 pub fn reduce(&self) -> VortexResult<Option<ArrayRef>> {
543 self.0.reduce(self)
544 }
545
546 pub fn reduce_parent(
547 &self,
548 parent: &ArrayRef,
549 child_idx: usize,
550 ) -> VortexResult<Option<ArrayRef>> {
551 self.0.reduce_parent(self, parent, child_idx)
552 }
553
554 pub(crate) fn execute_encoding(self, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
555 let inner = Arc::as_ptr(&self.0);
556 unsafe { (&*inner).execute(self, ctx) }
558 }
559
560 pub(crate) fn execute_encoding_unchecked(
566 self,
567 ctx: &mut ExecutionCtx,
568 ) -> VortexResult<ExecutionResult> {
569 let inner = Arc::as_ptr(&self.0);
570 let inner = unsafe { &*inner };
572 unsafe { inner.execute_unchecked(self, ctx) }
576 }
577
578 pub fn execute_parent(
579 &self,
580 parent: &ArrayRef,
581 child_idx: usize,
582 ctx: &mut ExecutionCtx,
583 ) -> VortexResult<Option<ArrayRef>> {
584 self.0.execute_parent(self, parent, child_idx, ctx)
585 }
586
587 pub fn children(&self) -> Vec<ArrayRef> {
591 self.0.children(self)
592 }
593
594 pub fn nchildren(&self) -> usize {
596 self.0.nchildren(self)
597 }
598
599 pub fn nth_child(&self, idx: usize) -> Option<ArrayRef> {
601 self.0.nth_child(self, idx)
602 }
603
604 pub fn children_names(&self) -> Vec<String> {
606 self.0.children_names(self)
607 }
608
609 pub fn named_children(&self) -> Vec<(String, ArrayRef)> {
611 self.0.named_children(self)
612 }
613
614 pub fn buffers(&self) -> Vec<ByteBuffer> {
616 self.0.buffers(self)
617 }
618
619 pub fn buffer_handles(&self) -> Vec<BufferHandle> {
621 self.0.buffer_handles(self)
622 }
623
624 pub fn buffer_names(&self) -> Vec<String> {
626 self.0.buffer_names(self)
627 }
628
629 pub fn named_buffers(&self) -> Vec<(String, BufferHandle)> {
631 self.0.named_buffers(self)
632 }
633
634 pub fn nbuffers(&self) -> usize {
636 self.0.nbuffers(self)
637 }
638
639 pub fn slots(&self) -> &[Option<ArrayRef>] {
641 self.0.slots()
642 }
643
644 pub fn slot_name(&self, idx: usize) -> String {
646 self.0.slot_name(self, idx)
647 }
648
649 pub fn metadata_fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
651 self.0.metadata_fmt(f)
652 }
653
654 pub fn is_host(&self) -> bool {
656 for array in self.depth_first_traversal() {
657 if !array.buffer_handles().iter().all(BufferHandle::is_on_host) {
658 return false;
659 }
660 }
661 true
662 }
663
664 pub fn nbuffers_recursive(&self) -> usize {
668 self.children()
669 .iter()
670 .map(|c| c.nbuffers_recursive())
671 .sum::<usize>()
672 + self.nbuffers()
673 }
674
675 pub fn depth_first_traversal(&self) -> DepthFirstArrayIterator {
677 DepthFirstArrayIterator {
678 stack: vec![self.clone()],
679 }
680 }
681}
682
683impl IntoArray for ArrayRef {
684 #[inline(always)]
685 fn into_array(self) -> ArrayRef {
686 self
687 }
688}
689
690impl<V: VTable> Matcher for V {
691 type Match<'a> = ArrayView<'a, V>;
692
693 fn matches(array: &ArrayRef) -> bool {
694 array.0.as_any().is::<ArrayInner<V>>()
695 }
696
697 fn try_match<'a>(array: &'a ArrayRef) -> Option<ArrayView<'a, V>> {
698 let inner = array.0.as_any().downcast_ref::<ArrayInner<V>>()?;
699 Some(unsafe { ArrayView::new_unchecked(array, &inner.data) })
701 }
702}