1use std::any::type_name;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::hash::Hasher;
8use std::ops::Range;
9use std::sync::Arc;
10
11use vortex_buffer::ByteBuffer;
12use vortex_error::VortexExpect;
13use vortex_error::VortexResult;
14use vortex_error::vortex_ensure;
15use vortex_error::vortex_err;
16use vortex_error::vortex_panic;
17use vortex_mask::Mask;
18
19use crate::AnyCanonical;
20use crate::Array;
21use crate::ArrayEq;
22use crate::ArrayHash;
23use crate::ArrayView;
24use crate::Canonical;
25use crate::ExecutionCtx;
26use crate::IntoArray;
27use crate::LEGACY_SESSION;
28use crate::VTable;
29use crate::VortexSessionExecute;
30use crate::aggregate_fn::fns::sum::sum;
31use crate::array::ArrayId;
32use crate::array::ArrayInner;
33use crate::array::DynArray;
34use crate::arrays::Bool;
35use crate::arrays::Constant;
36use crate::arrays::DictArray;
37use crate::arrays::FilterArray;
38use crate::arrays::Null;
39use crate::arrays::Primitive;
40use crate::arrays::SliceArray;
41use crate::arrays::VarBin;
42use crate::arrays::VarBinView;
43use crate::buffer::BufferHandle;
44use crate::builders::ArrayBuilder;
45use crate::dtype::DType;
46use crate::dtype::Nullability;
47use crate::expr::stats::Precision;
48use crate::expr::stats::Stat;
49use crate::expr::stats::StatsProviderExt;
50use crate::matcher::Matcher;
51use crate::optimizer::ArrayOptimizer;
52use crate::scalar::Scalar;
53use crate::stats::StatsSetRef;
54use crate::validity::Validity;
55
56pub struct DepthFirstArrayIterator {
58 stack: Vec<ArrayRef>,
59}
60
61impl Iterator for DepthFirstArrayIterator {
62 type Item = ArrayRef;
63
64 fn next(&mut self) -> Option<Self::Item> {
65 let next = self.stack.pop()?;
66 for child in next.children().into_iter().rev() {
67 self.stack.push(child);
68 }
69 Some(next)
70 }
71}
72
73#[derive(Clone)]
75pub struct ArrayRef(Arc<dyn DynArray>);
76
77impl ArrayRef {
78 pub(crate) fn from_inner(inner: Arc<dyn DynArray>) -> Self {
80 Self(inner)
81 }
82
83 #[doc(hidden)]
86 pub fn addr(&self) -> usize {
87 Arc::as_ptr(&self.0).addr()
88 }
89
90 #[inline(always)]
92 pub(crate) fn inner(&self) -> &Arc<dyn DynArray> {
93 &self.0
94 }
95
96 #[inline(always)]
98 pub(crate) fn into_inner(self) -> Arc<dyn DynArray> {
99 self.0
100 }
101
102 pub fn ptr_eq(this: &ArrayRef, other: &ArrayRef) -> bool {
104 Arc::ptr_eq(&this.0, &other.0)
105 }
106}
107
108impl Debug for ArrayRef {
109 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
110 Debug::fmt(&*self.0, f)
111 }
112}
113
114impl ArrayHash for ArrayRef {
115 fn array_hash<H: Hasher>(&self, state: &mut H, precision: crate::Precision) {
116 self.0.dyn_array_hash(state as &mut dyn Hasher, precision);
117 }
118}
119
120impl ArrayEq for ArrayRef {
121 fn array_eq(&self, other: &Self, precision: crate::Precision) -> bool {
122 self.0.dyn_array_eq(other, precision)
123 }
124}
125impl ArrayRef {
126 #[inline]
128 pub fn len(&self) -> usize {
129 self.0.len()
130 }
131
132 #[inline]
134 pub fn is_empty(&self) -> bool {
135 self.0.len() == 0
136 }
137
138 #[inline]
140 pub fn dtype(&self) -> &DType {
141 self.0.dtype()
142 }
143
144 #[inline]
146 pub fn encoding_id(&self) -> ArrayId {
147 self.0.encoding_id()
148 }
149
150 pub fn slice(&self, range: Range<usize>) -> VortexResult<ArrayRef> {
152 let len = self.len();
153 let start = range.start;
154 let stop = range.end;
155
156 if start == 0 && stop == len {
157 return Ok(self.clone());
158 }
159
160 vortex_ensure!(start <= len, "OutOfBounds: start {start} > length {}", len);
161 vortex_ensure!(stop <= len, "OutOfBounds: stop {stop} > length {}", len);
162
163 vortex_ensure!(start <= stop, "start ({start}) must be <= stop ({stop})");
164
165 if start == stop {
166 return Ok(Canonical::empty(self.dtype()).into_array());
167 }
168
169 let sliced = SliceArray::try_new(self.clone(), range)?
170 .into_array()
171 .optimize()?;
172
173 if !sliced.is::<Constant>() {
175 self.statistics().with_iter(|iter| {
176 sliced.statistics().inherit(iter.filter(|(stat, value)| {
177 matches!(
178 stat,
179 Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted
180 ) && value.as_ref().as_exact().is_some_and(|v| {
181 Scalar::try_new(DType::Bool(Nullability::NonNullable), Some(v.clone()))
182 .vortex_expect("A stat that was expected to be a boolean stat was not")
183 .as_bool()
184 .value()
185 .unwrap_or_default()
186 })
187 }));
188 });
189 }
190
191 Ok(sliced)
192 }
193
194 pub fn filter(&self, mask: Mask) -> VortexResult<ArrayRef> {
196 FilterArray::try_new(self.clone(), mask)?
197 .into_array()
198 .optimize()
199 }
200
201 pub fn take(&self, indices: ArrayRef) -> VortexResult<ArrayRef> {
203 DictArray::try_new(indices, self.clone())?
204 .into_array()
205 .optimize()
206 }
207
208 #[deprecated(
210 note = "Use `execute_scalar` instead, which allows passing an execution context for more \
211 efficient execution when fetching multiple scalars from the same array."
212 )]
213 pub fn scalar_at(&self, index: usize) -> VortexResult<Scalar> {
214 self.execute_scalar(index, &mut LEGACY_SESSION.create_execution_ctx())
215 }
216
217 pub fn execute_scalar(&self, index: usize, ctx: &mut ExecutionCtx) -> VortexResult<Scalar> {
219 vortex_ensure!(index < self.len(), OutOfBounds: index, 0, self.len());
220 if self.is_invalid(index, ctx)? {
221 return Ok(Scalar::null(self.dtype().clone()));
222 }
223 let scalar = self.0.execute_scalar(self, index, ctx)?;
224 vortex_ensure!(self.dtype() == scalar.dtype(), "Scalar dtype mismatch");
225 Ok(scalar)
226 }
227
228 pub fn is_valid(&self, index: usize, ctx: &mut ExecutionCtx) -> VortexResult<bool> {
230 vortex_ensure!(index < self.len(), OutOfBounds: index, 0, self.len());
231 match self.validity()? {
232 Validity::NonNullable | Validity::AllValid => Ok(true),
233 Validity::AllInvalid => Ok(false),
234 Validity::Array(a) => a
235 .execute_scalar(index, ctx)?
236 .as_bool()
237 .value()
238 .ok_or_else(|| vortex_err!("validity value at index {} is null", index)),
239 }
240 }
241
242 pub fn is_invalid(&self, index: usize, ctx: &mut ExecutionCtx) -> VortexResult<bool> {
244 Ok(!self.is_valid(index, ctx)?)
245 }
246
247 pub fn all_valid(&self, ctx: &mut ExecutionCtx) -> VortexResult<bool> {
249 match self.validity()? {
250 Validity::NonNullable | Validity::AllValid => Ok(true),
251 Validity::AllInvalid => Ok(false),
252 Validity::Array(a) => Ok(a.statistics().compute_min::<bool>(ctx).unwrap_or(false)),
253 }
254 }
255
256 pub fn all_invalid(&self, ctx: &mut ExecutionCtx) -> VortexResult<bool> {
258 match self.validity()? {
259 Validity::NonNullable | Validity::AllValid => Ok(false),
260 Validity::AllInvalid => Ok(true),
261 Validity::Array(a) => Ok(!a.statistics().compute_max::<bool>(ctx).unwrap_or(true)),
262 }
263 }
264
265 pub fn valid_count(&self, ctx: &mut ExecutionCtx) -> VortexResult<usize> {
267 let len = self.len();
268 if let Some(Precision::Exact(invalid_count)) =
269 self.statistics().get_as::<usize>(Stat::NullCount)
270 {
271 return Ok(len - invalid_count);
272 }
273
274 let count = match self.validity()? {
275 Validity::NonNullable | Validity::AllValid => len,
276 Validity::AllInvalid => 0,
277 Validity::Array(a) => {
278 let array_sum = sum(&a, ctx)?;
279 array_sum
280 .as_primitive()
281 .as_::<usize>()
282 .ok_or_else(|| vortex_err!("sum of validity array is null"))?
283 }
284 };
285 vortex_ensure!(count <= len, "Valid count exceeds array length");
286
287 self.statistics()
288 .set(Stat::NullCount, Precision::exact(len - count));
289
290 Ok(count)
291 }
292
293 pub fn invalid_count(&self, ctx: &mut ExecutionCtx) -> VortexResult<usize> {
295 Ok(self.len() - self.valid_count(ctx)?)
296 }
297
298 pub fn validity(&self) -> VortexResult<Validity> {
300 self.0.validity(self)
301 }
302
303 pub fn into_canonical(self) -> VortexResult<Canonical> {
305 self.execute(&mut LEGACY_SESSION.create_execution_ctx())
306 }
307
308 pub fn to_canonical(&self) -> VortexResult<Canonical> {
310 self.clone().into_canonical()
311 }
312
313 pub fn append_to_builder(
315 &self,
316 builder: &mut dyn ArrayBuilder,
317 ctx: &mut ExecutionCtx,
318 ) -> VortexResult<()> {
319 self.0.append_to_builder(self, builder, ctx)
320 }
321
322 pub fn statistics(&self) -> StatsSetRef<'_> {
324 self.0.statistics().to_ref(self)
325 }
326
327 pub fn is<M: Matcher>(&self) -> bool {
329 M::matches(self)
330 }
331
332 pub fn as_<M: Matcher>(&self) -> M::Match<'_> {
334 self.as_opt::<M>().vortex_expect("Failed to downcast")
335 }
336
337 pub fn as_opt<M: Matcher>(&self) -> Option<M::Match<'_>> {
339 M::try_match(self)
340 }
341
342 pub fn try_downcast<V: VTable>(self) -> Result<Array<V>, ArrayRef> {
344 Array::<V>::try_from_array_ref(self)
345 }
346
347 pub fn downcast<V: VTable>(self) -> Array<V> {
353 Self::try_downcast(self)
354 .unwrap_or_else(|_| vortex_panic!("Failed to downcast to {}", type_name::<V>()))
355 }
356
357 pub fn as_typed<V: VTable>(&self) -> Option<ArrayView<'_, V>> {
359 let inner = self.0.as_any().downcast_ref::<ArrayInner<V>>()?;
360 Some(unsafe { ArrayView::new_unchecked(self, &inner.data) })
361 }
362
363 pub fn as_constant(&self) -> Option<Scalar> {
365 self.as_opt::<Constant>().map(|a| a.scalar().clone())
366 }
367
368 pub fn nbytes(&self) -> u64 {
370 let mut nbytes = 0;
371 for array in self.depth_first_traversal() {
372 for buffer in array.buffers() {
373 nbytes += buffer.len() as u64;
374 }
375 }
376 nbytes
377 }
378
379 pub fn is_arrow(&self) -> bool {
381 self.is::<Null>()
382 || self.is::<Bool>()
383 || self.is::<Primitive>()
384 || self.is::<VarBin>()
385 || self.is::<VarBinView>()
386 }
387
388 pub fn is_canonical(&self) -> bool {
390 self.is::<AnyCanonical>()
391 }
392
393 pub fn with_slot(self, slot_idx: usize, replacement: ArrayRef) -> VortexResult<ArrayRef> {
400 let slots = self.slots().to_vec();
401 let nslots = slots.len();
402 vortex_ensure!(
403 slot_idx < nslots,
404 "slot index {} out of bounds for array with {} slots",
405 slot_idx,
406 nslots
407 );
408 let existing = slots[slot_idx]
409 .as_ref()
410 .vortex_expect("with_slot cannot replace an absent slot");
411 vortex_ensure!(
412 existing.dtype() == replacement.dtype(),
413 "slot {} dtype changed from {} to {} during physical rewrite",
414 slot_idx,
415 existing.dtype(),
416 replacement.dtype()
417 );
418 vortex_ensure!(
419 existing.len() == replacement.len(),
420 "slot {} len changed from {} to {} during physical rewrite",
421 slot_idx,
422 existing.len(),
423 replacement.len()
424 );
425 let mut slots = slots;
426 slots[slot_idx] = Some(replacement);
427 self.with_slots(slots)
428 }
429
430 pub fn with_slots(self, slots: Vec<Option<ArrayRef>>) -> VortexResult<ArrayRef> {
435 let old_slots = self.slots();
436 vortex_ensure!(
437 old_slots.len() == slots.len(),
438 "slot count changed from {} to {} during physical rewrite",
439 old_slots.len(),
440 slots.len()
441 );
442 for (idx, (old_slot, new_slot)) in old_slots.iter().zip(slots.iter()).enumerate() {
443 vortex_ensure!(
444 old_slot.is_some() == new_slot.is_some(),
445 "slot {} presence changed during physical rewrite",
446 idx
447 );
448 if let (Some(old_slot), Some(new_slot)) = (old_slot.as_ref(), new_slot.as_ref()) {
449 vortex_ensure!(
450 old_slot.dtype() == new_slot.dtype(),
451 "slot {} dtype changed from {} to {} during physical rewrite",
452 idx,
453 old_slot.dtype(),
454 new_slot.dtype()
455 );
456 vortex_ensure!(
457 old_slot.len() == new_slot.len(),
458 "slot {} len changed from {} to {} during physical rewrite",
459 idx,
460 old_slot.len(),
461 new_slot.len()
462 );
463 }
464 }
465 let inner = Arc::clone(&self.0);
466 inner.with_slots(self, slots)
467 }
468
469 pub fn reduce(&self) -> VortexResult<Option<ArrayRef>> {
470 self.0.reduce(self)
471 }
472
473 pub fn reduce_parent(
474 &self,
475 parent: &ArrayRef,
476 child_idx: usize,
477 ) -> VortexResult<Option<ArrayRef>> {
478 self.0.reduce_parent(self, parent, child_idx)
479 }
480
481 pub(crate) fn execute_encoding(
482 self,
483 ctx: &mut ExecutionCtx,
484 ) -> VortexResult<crate::ExecutionResult> {
485 let inner = Arc::clone(&self.0);
486 inner.execute(self, ctx)
487 }
488
489 pub fn execute_parent(
490 &self,
491 parent: &ArrayRef,
492 child_idx: usize,
493 ctx: &mut ExecutionCtx,
494 ) -> VortexResult<Option<ArrayRef>> {
495 self.0.execute_parent(self, parent, child_idx, ctx)
496 }
497
498 pub fn children(&self) -> Vec<ArrayRef> {
502 self.0.children(self)
503 }
504
505 pub fn nchildren(&self) -> usize {
507 self.0.nchildren(self)
508 }
509
510 pub fn nth_child(&self, idx: usize) -> Option<ArrayRef> {
512 self.0.nth_child(self, idx)
513 }
514
515 pub fn children_names(&self) -> Vec<String> {
517 self.0.children_names(self)
518 }
519
520 pub fn named_children(&self) -> Vec<(String, ArrayRef)> {
522 self.0.named_children(self)
523 }
524
525 pub fn buffers(&self) -> Vec<ByteBuffer> {
527 self.0.buffers(self)
528 }
529
530 pub fn buffer_handles(&self) -> Vec<BufferHandle> {
532 self.0.buffer_handles(self)
533 }
534
535 pub fn buffer_names(&self) -> Vec<String> {
537 self.0.buffer_names(self)
538 }
539
540 pub fn named_buffers(&self) -> Vec<(String, BufferHandle)> {
542 self.0.named_buffers(self)
543 }
544
545 pub fn nbuffers(&self) -> usize {
547 self.0.nbuffers(self)
548 }
549
550 pub fn slots(&self) -> &[Option<ArrayRef>] {
552 self.0.slots()
553 }
554
555 pub fn slot_name(&self, idx: usize) -> String {
557 self.0.slot_name(self, idx)
558 }
559
560 pub fn metadata_fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
562 self.0.metadata_fmt(f)
563 }
564
565 pub fn is_host(&self) -> bool {
567 for array in self.depth_first_traversal() {
568 if !array.buffer_handles().iter().all(BufferHandle::is_on_host) {
569 return false;
570 }
571 }
572 true
573 }
574
575 pub fn nbuffers_recursive(&self) -> usize {
579 self.children()
580 .iter()
581 .map(|c| c.nbuffers_recursive())
582 .sum::<usize>()
583 + self.nbuffers()
584 }
585
586 pub fn depth_first_traversal(&self) -> DepthFirstArrayIterator {
588 DepthFirstArrayIterator {
589 stack: vec![self.clone()],
590 }
591 }
592}
593
594impl IntoArray for ArrayRef {
595 #[inline(always)]
596 fn into_array(self) -> ArrayRef {
597 self
598 }
599}
600
601impl<V: VTable> Matcher for V {
602 type Match<'a> = ArrayView<'a, V>;
603
604 fn matches(array: &ArrayRef) -> bool {
605 array.0.as_any().is::<ArrayInner<V>>()
606 }
607
608 fn try_match<'a>(array: &'a ArrayRef) -> Option<ArrayView<'a, V>> {
609 let inner = array.0.as_any().downcast_ref::<ArrayInner<V>>()?;
610 Some(unsafe { ArrayView::new_unchecked(array, &inner.data) })
611 }
612}