1use std::any::type_name;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::hash::Hasher;
8use std::ops::Range;
9use std::sync::Arc;
10
11use vortex_buffer::ByteBuffer;
12use vortex_error::VortexExpect;
13use vortex_error::VortexResult;
14use vortex_error::vortex_ensure;
15use vortex_error::vortex_err;
16use vortex_error::vortex_panic;
17use vortex_mask::Mask;
18use vortex_session::VortexSession;
19
20use crate::AnyCanonical;
21use crate::Array;
22use crate::ArrayEq;
23use crate::ArrayHash;
24use crate::ArrayView;
25use crate::Canonical;
26use crate::ExecutionCtx;
27use crate::IntoArray;
28use crate::LEGACY_SESSION;
29use crate::ToCanonical;
30use crate::VTable;
31use crate::VortexSessionExecute;
32use crate::aggregate_fn::fns::sum::sum;
33use crate::array::ArrayId;
34use crate::array::ArrayInner;
35use crate::array::DynArray;
36use crate::arrays::Bool;
37use crate::arrays::Constant;
38use crate::arrays::DictArray;
39use crate::arrays::FilterArray;
40use crate::arrays::Null;
41use crate::arrays::Primitive;
42use crate::arrays::SliceArray;
43use crate::arrays::VarBin;
44use crate::arrays::VarBinView;
45use crate::arrays::bool::BoolArrayExt;
46use crate::buffer::BufferHandle;
47use crate::builders::ArrayBuilder;
48use crate::dtype::DType;
49use crate::dtype::Nullability;
50use crate::expr::stats::Precision;
51use crate::expr::stats::Stat;
52use crate::expr::stats::StatsProviderExt;
53use crate::matcher::Matcher;
54use crate::optimizer::ArrayOptimizer;
55use crate::scalar::Scalar;
56use crate::stats::StatsSetRef;
57use crate::validity::Validity;
58
59pub struct DepthFirstArrayIterator {
61 stack: Vec<ArrayRef>,
62}
63
64impl Iterator for DepthFirstArrayIterator {
65 type Item = ArrayRef;
66
67 fn next(&mut self) -> Option<Self::Item> {
68 let next = self.stack.pop()?;
69 for child in next.children().into_iter().rev() {
70 self.stack.push(child);
71 }
72 Some(next)
73 }
74}
75
76#[derive(Clone)]
78pub struct ArrayRef(Arc<dyn DynArray>);
79
80impl ArrayRef {
81 pub(crate) fn from_inner(inner: Arc<dyn DynArray>) -> Self {
83 Self(inner)
84 }
85
86 #[doc(hidden)]
89 pub fn addr(&self) -> usize {
90 Arc::as_ptr(&self.0).addr()
91 }
92
93 #[inline(always)]
95 pub(crate) fn inner(&self) -> &Arc<dyn DynArray> {
96 &self.0
97 }
98
99 #[inline(always)]
101 pub(crate) fn into_inner(self) -> Arc<dyn DynArray> {
102 self.0
103 }
104
105 pub fn ptr_eq(this: &ArrayRef, other: &ArrayRef) -> bool {
107 Arc::ptr_eq(&this.0, &other.0)
108 }
109}
110
111impl Debug for ArrayRef {
112 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
113 Debug::fmt(&*self.0, f)
114 }
115}
116
117impl ArrayHash for ArrayRef {
118 fn array_hash<H: Hasher>(&self, state: &mut H, precision: crate::Precision) {
119 self.0.dyn_array_hash(state as &mut dyn Hasher, precision);
120 }
121}
122
123impl ArrayEq for ArrayRef {
124 fn array_eq(&self, other: &Self, precision: crate::Precision) -> bool {
125 self.0.dyn_array_eq(other, precision)
126 }
127}
128
129#[allow(clippy::same_name_method)]
130impl ArrayRef {
131 #[inline]
133 pub fn len(&self) -> usize {
134 self.0.len()
135 }
136
137 #[inline]
139 pub fn is_empty(&self) -> bool {
140 self.0.len() == 0
141 }
142
143 #[inline]
145 pub fn dtype(&self) -> &DType {
146 self.0.dtype()
147 }
148
149 #[inline]
151 pub fn encoding_id(&self) -> ArrayId {
152 self.0.encoding_id()
153 }
154
155 pub fn slice(&self, range: Range<usize>) -> VortexResult<ArrayRef> {
157 let len = self.len();
158 let start = range.start;
159 let stop = range.end;
160
161 if start == 0 && stop == len {
162 return Ok(self.clone());
163 }
164
165 vortex_ensure!(start <= len, "OutOfBounds: start {start} > length {}", len);
166 vortex_ensure!(stop <= len, "OutOfBounds: stop {stop} > length {}", len);
167
168 vortex_ensure!(start <= stop, "start ({start}) must be <= stop ({stop})");
169
170 if start == stop {
171 return Ok(Canonical::empty(self.dtype()).into_array());
172 }
173
174 let sliced = SliceArray::try_new(self.clone(), range)?
175 .into_array()
176 .optimize()?;
177
178 if !sliced.is::<Constant>() {
180 self.statistics().with_iter(|iter| {
181 sliced.statistics().inherit(iter.filter(|(stat, value)| {
182 matches!(
183 stat,
184 Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted
185 ) && value.as_ref().as_exact().is_some_and(|v| {
186 Scalar::try_new(DType::Bool(Nullability::NonNullable), Some(v.clone()))
187 .vortex_expect("A stat that was expected to be a boolean stat was not")
188 .as_bool()
189 .value()
190 .unwrap_or_default()
191 })
192 }));
193 });
194 }
195
196 Ok(sliced)
197 }
198
199 pub fn filter(&self, mask: Mask) -> VortexResult<ArrayRef> {
201 FilterArray::try_new(self.clone(), mask)?
202 .into_array()
203 .optimize()
204 }
205
206 pub fn take(&self, indices: ArrayRef) -> VortexResult<ArrayRef> {
208 DictArray::try_new(indices, self.clone())?
209 .into_array()
210 .optimize()
211 }
212
213 pub fn scalar_at(&self, index: usize) -> VortexResult<Scalar> {
215 vortex_ensure!(index < self.len(), OutOfBounds: index, 0, self.len());
216 if self.is_invalid(index)? {
217 return Ok(Scalar::null(self.dtype().clone()));
218 }
219 let scalar = self.0.scalar_at(self, index)?;
220 vortex_ensure!(self.dtype() == scalar.dtype(), "Scalar dtype mismatch");
221 Ok(scalar)
222 }
223
224 pub fn is_valid(&self, index: usize) -> VortexResult<bool> {
226 vortex_ensure!(index < self.len(), OutOfBounds: index, 0, self.len());
227 match self.validity()? {
228 Validity::NonNullable | Validity::AllValid => Ok(true),
229 Validity::AllInvalid => Ok(false),
230 Validity::Array(a) => a
231 .scalar_at(index)?
232 .as_bool()
233 .value()
234 .ok_or_else(|| vortex_err!("validity value at index {} is null", index)),
235 }
236 }
237
238 pub fn is_invalid(&self, index: usize) -> VortexResult<bool> {
240 Ok(!self.is_valid(index)?)
241 }
242
243 pub fn all_valid(&self) -> VortexResult<bool> {
245 match self.validity()? {
246 Validity::NonNullable | Validity::AllValid => Ok(true),
247 Validity::AllInvalid => Ok(false),
248 Validity::Array(a) => Ok(a.statistics().compute_min::<bool>().unwrap_or(false)),
249 }
250 }
251
252 pub fn all_invalid(&self) -> VortexResult<bool> {
254 match self.validity()? {
255 Validity::NonNullable | Validity::AllValid => Ok(false),
256 Validity::AllInvalid => Ok(true),
257 Validity::Array(a) => Ok(!a.statistics().compute_max::<bool>().unwrap_or(true)),
258 }
259 }
260
261 pub fn valid_count(&self) -> VortexResult<usize> {
263 let len = self.len();
264 if let Some(Precision::Exact(invalid_count)) =
265 self.statistics().get_as::<usize>(Stat::NullCount)
266 {
267 return Ok(len - invalid_count);
268 }
269
270 let count = match self.validity()? {
271 Validity::NonNullable | Validity::AllValid => len,
272 Validity::AllInvalid => 0,
273 Validity::Array(a) => {
274 let mut ctx = LEGACY_SESSION.create_execution_ctx();
275 let array_sum = sum(&a, &mut ctx)?;
276 array_sum
277 .as_primitive()
278 .as_::<usize>()
279 .ok_or_else(|| vortex_err!("sum of validity array is null"))?
280 }
281 };
282 vortex_ensure!(count <= len, "Valid count exceeds array length");
283
284 self.statistics()
285 .set(Stat::NullCount, Precision::exact(len - count));
286
287 Ok(count)
288 }
289
290 pub fn invalid_count(&self) -> VortexResult<usize> {
292 Ok(self.len() - self.valid_count()?)
293 }
294
295 pub fn validity(&self) -> VortexResult<Validity> {
297 self.0.validity(self)
298 }
299
300 pub fn validity_mask(&self) -> VortexResult<Mask> {
302 match self.validity()? {
303 Validity::NonNullable | Validity::AllValid => Ok(Mask::new_true(self.len())),
304 Validity::AllInvalid => Ok(Mask::new_false(self.len())),
305 Validity::Array(a) => Ok(a.to_bool().to_mask()),
306 }
307 }
308
309 pub fn into_canonical(self) -> VortexResult<Canonical> {
311 self.execute(&mut LEGACY_SESSION.create_execution_ctx())
312 }
313
314 pub fn to_canonical(&self) -> VortexResult<Canonical> {
316 self.clone().into_canonical()
317 }
318
319 pub fn append_to_builder(
321 &self,
322 builder: &mut dyn ArrayBuilder,
323 ctx: &mut ExecutionCtx,
324 ) -> VortexResult<()> {
325 self.0.append_to_builder(self, builder, ctx)
326 }
327
328 pub fn statistics(&self) -> StatsSetRef<'_> {
330 self.0.statistics().to_ref(self)
331 }
332
333 pub fn is<M: Matcher>(&self) -> bool {
335 M::matches(self)
336 }
337
338 pub fn as_<M: Matcher>(&self) -> M::Match<'_> {
340 self.as_opt::<M>().vortex_expect("Failed to downcast")
341 }
342
343 pub fn as_opt<M: Matcher>(&self) -> Option<M::Match<'_>> {
345 M::try_match(self)
346 }
347
348 pub fn try_downcast<V: VTable>(self) -> Result<Array<V>, ArrayRef> {
350 Array::<V>::try_from_array_ref(self)
351 }
352
353 pub fn downcast<V: VTable>(self) -> Array<V> {
359 Self::try_downcast(self)
360 .unwrap_or_else(|_| vortex_panic!("Failed to downcast to {}", type_name::<V>()))
361 }
362
363 pub fn as_typed<V: VTable>(&self) -> Option<ArrayView<'_, V>> {
365 let inner = self.0.as_any().downcast_ref::<ArrayInner<V>>()?;
366 Some(unsafe { ArrayView::new_unchecked(self, &inner.data) })
367 }
368
369 pub fn as_constant(&self) -> Option<Scalar> {
371 self.as_opt::<Constant>().map(|a| a.scalar().clone())
372 }
373
374 pub fn nbytes(&self) -> u64 {
376 let mut nbytes = 0;
377 for array in self.depth_first_traversal() {
378 for buffer in array.buffers() {
379 nbytes += buffer.len() as u64;
380 }
381 }
382 nbytes
383 }
384
385 pub fn is_arrow(&self) -> bool {
387 self.is::<Null>()
388 || self.is::<Bool>()
389 || self.is::<Primitive>()
390 || self.is::<VarBin>()
391 || self.is::<VarBinView>()
392 }
393
394 pub fn is_canonical(&self) -> bool {
396 self.is::<AnyCanonical>()
397 }
398
399 pub fn with_slot(self, slot_idx: usize, replacement: ArrayRef) -> VortexResult<ArrayRef> {
406 let slots = self.slots().to_vec();
407 let nslots = slots.len();
408 vortex_ensure!(
409 slot_idx < nslots,
410 "slot index {} out of bounds for array with {} slots",
411 slot_idx,
412 nslots
413 );
414 let existing = slots[slot_idx]
415 .as_ref()
416 .vortex_expect("with_slot cannot replace an absent slot");
417 vortex_ensure!(
418 existing.dtype() == replacement.dtype(),
419 "slot {} dtype changed from {} to {} during physical rewrite",
420 slot_idx,
421 existing.dtype(),
422 replacement.dtype()
423 );
424 vortex_ensure!(
425 existing.len() == replacement.len(),
426 "slot {} len changed from {} to {} during physical rewrite",
427 slot_idx,
428 existing.len(),
429 replacement.len()
430 );
431 let mut slots = slots;
432 slots[slot_idx] = Some(replacement);
433 self.with_slots(slots)
434 }
435
436 pub fn with_slots(self, slots: Vec<Option<ArrayRef>>) -> VortexResult<ArrayRef> {
441 let old_slots = self.slots();
442 vortex_ensure!(
443 old_slots.len() == slots.len(),
444 "slot count changed from {} to {} during physical rewrite",
445 old_slots.len(),
446 slots.len()
447 );
448 for (idx, (old_slot, new_slot)) in old_slots.iter().zip(slots.iter()).enumerate() {
449 vortex_ensure!(
450 old_slot.is_some() == new_slot.is_some(),
451 "slot {} presence changed during physical rewrite",
452 idx
453 );
454 if let (Some(old_slot), Some(new_slot)) = (old_slot.as_ref(), new_slot.as_ref()) {
455 vortex_ensure!(
456 old_slot.dtype() == new_slot.dtype(),
457 "slot {} dtype changed from {} to {} during physical rewrite",
458 idx,
459 old_slot.dtype(),
460 new_slot.dtype()
461 );
462 vortex_ensure!(
463 old_slot.len() == new_slot.len(),
464 "slot {} len changed from {} to {} during physical rewrite",
465 idx,
466 old_slot.len(),
467 new_slot.len()
468 );
469 }
470 }
471 let inner = Arc::clone(&self.0);
472 inner.with_slots(self, slots)
473 }
474
475 pub fn reduce(&self) -> VortexResult<Option<ArrayRef>> {
476 self.0.reduce(self)
477 }
478
479 pub fn reduce_parent(
480 &self,
481 parent: &ArrayRef,
482 child_idx: usize,
483 ) -> VortexResult<Option<ArrayRef>> {
484 self.0.reduce_parent(self, parent, child_idx)
485 }
486
487 pub(crate) fn execute_encoding(
488 self,
489 ctx: &mut ExecutionCtx,
490 ) -> VortexResult<crate::ExecutionResult> {
491 let inner = Arc::clone(&self.0);
492 inner.execute(self, ctx)
493 }
494
495 pub fn execute_parent(
496 &self,
497 parent: &ArrayRef,
498 child_idx: usize,
499 ctx: &mut ExecutionCtx,
500 ) -> VortexResult<Option<ArrayRef>> {
501 self.0.execute_parent(self, parent, child_idx, ctx)
502 }
503
504 pub fn children(&self) -> Vec<ArrayRef> {
508 self.0.children(self)
509 }
510
511 pub fn nchildren(&self) -> usize {
513 self.0.nchildren(self)
514 }
515
516 pub fn nth_child(&self, idx: usize) -> Option<ArrayRef> {
518 self.0.nth_child(self, idx)
519 }
520
521 pub fn children_names(&self) -> Vec<String> {
523 self.0.children_names(self)
524 }
525
526 pub fn named_children(&self) -> Vec<(String, ArrayRef)> {
528 self.0.named_children(self)
529 }
530
531 pub fn buffers(&self) -> Vec<ByteBuffer> {
533 self.0.buffers(self)
534 }
535
536 pub fn buffer_handles(&self) -> Vec<BufferHandle> {
538 self.0.buffer_handles(self)
539 }
540
541 pub fn buffer_names(&self) -> Vec<String> {
543 self.0.buffer_names(self)
544 }
545
546 pub fn named_buffers(&self) -> Vec<(String, BufferHandle)> {
548 self.0.named_buffers(self)
549 }
550
551 pub fn nbuffers(&self) -> usize {
553 self.0.nbuffers(self)
554 }
555
556 pub fn slots(&self) -> &[Option<ArrayRef>] {
558 self.0.slots()
559 }
560
561 pub fn slot_name(&self, idx: usize) -> String {
563 self.0.slot_name(self, idx)
564 }
565
566 pub fn metadata(&self, session: &VortexSession) -> VortexResult<Option<Vec<u8>>> {
568 self.0.metadata(self, session)
569 }
570
571 pub fn metadata_fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
573 self.0.metadata_fmt(f)
574 }
575
576 pub fn is_host(&self) -> bool {
578 for array in self.depth_first_traversal() {
579 if !array.buffer_handles().iter().all(BufferHandle::is_on_host) {
580 return false;
581 }
582 }
583 true
584 }
585
586 pub fn nbuffers_recursive(&self) -> usize {
590 self.children()
591 .iter()
592 .map(|c| c.nbuffers_recursive())
593 .sum::<usize>()
594 + self.nbuffers()
595 }
596
597 pub fn depth_first_traversal(&self) -> DepthFirstArrayIterator {
599 DepthFirstArrayIterator {
600 stack: vec![self.clone()],
601 }
602 }
603}
604
605impl IntoArray for ArrayRef {
606 #[inline(always)]
607 fn into_array(self) -> ArrayRef {
608 self
609 }
610}
611
612impl<V: VTable> Matcher for V {
613 type Match<'a> = ArrayView<'a, V>;
614
615 fn matches(array: &ArrayRef) -> bool {
616 array.0.as_any().is::<ArrayInner<V>>()
617 }
618
619 fn try_match<'a>(array: &'a ArrayRef) -> Option<ArrayView<'a, V>> {
620 let inner = array.0.as_any().downcast_ref::<ArrayInner<V>>()?;
621 Some(unsafe { ArrayView::new_unchecked(array, &inner.data) })
622 }
623}