1mod visitor;
5
6use std::any::Any;
7use std::fmt::Debug;
8use std::fmt::Formatter;
9use std::hash::Hash;
10use std::hash::Hasher;
11use std::ops::Deref;
12use std::ops::Range;
13use std::sync::Arc;
14
15pub use visitor::*;
16use vortex_buffer::ByteBuffer;
17use vortex_error::VortexExpect;
18use vortex_error::VortexResult;
19use vortex_error::vortex_ensure;
20use vortex_error::vortex_err;
21use vortex_error::vortex_panic;
22use vortex_mask::Mask;
23
24use crate::AnyCanonical;
25use crate::ArrayEq;
26use crate::ArrayHash;
27use crate::Canonical;
28use crate::DynArrayEq;
29use crate::DynArrayHash;
30use crate::ExecutionCtx;
31use crate::LEGACY_SESSION;
32use crate::ToCanonical;
33use crate::VortexSessionExecute;
34use crate::aggregate_fn::fns::sum::sum;
35use crate::arrays::Bool;
36use crate::arrays::Constant;
37use crate::arrays::DictArray;
38use crate::arrays::FilterArray;
39use crate::arrays::Null;
40use crate::arrays::Primitive;
41use crate::arrays::ScalarFnVTable;
42use crate::arrays::SliceArray;
43use crate::arrays::VarBin;
44use crate::arrays::VarBinView;
45use crate::buffer::BufferHandle;
46use crate::builders::ArrayBuilder;
47use crate::dtype::DType;
48use crate::dtype::Nullability;
49use crate::expr::stats::Precision;
50use crate::expr::stats::Stat;
51use crate::expr::stats::StatsProviderExt;
52use crate::hash;
53use crate::matcher::Matcher;
54use crate::optimizer::ArrayOptimizer;
55use crate::scalar::Scalar;
56use crate::scalar_fn::ReduceNode;
57use crate::scalar_fn::ReduceNodeRef;
58use crate::scalar_fn::ScalarFnRef;
59use crate::stats::StatsSetRef;
60use crate::validity::Validity;
61use crate::vtable::ArrayId;
62use crate::vtable::DynVTable;
63use crate::vtable::OperationsVTable;
64use crate::vtable::VTable;
65use crate::vtable::ValidityVTable;
66
67pub trait DynArray:
69 'static
70 + private::Sealed
71 + Send
72 + Sync
73 + Debug
74 + DynArrayEq
75 + DynArrayHash
76 + ArrayVisitor
77 + ReduceNode
78{
79 fn as_any(&self) -> &dyn Any;
81
82 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
84
85 fn to_array(&self) -> ArrayRef;
87
88 fn len(&self) -> usize;
90
91 fn is_empty(&self) -> bool {
93 self.len() == 0
94 }
95
96 fn dtype(&self) -> &DType;
98
99 fn vtable(&self) -> &dyn DynVTable;
101
102 fn encoding_id(&self) -> ArrayId;
104
105 fn slice(&self, range: Range<usize>) -> VortexResult<ArrayRef>;
107
108 fn filter(&self, mask: Mask) -> VortexResult<ArrayRef>;
110
111 fn take(&self, indices: ArrayRef) -> VortexResult<ArrayRef>;
113
114 fn scalar_at(&self, index: usize) -> VortexResult<Scalar>;
118
119 fn is_valid(&self, index: usize) -> VortexResult<bool>;
121
122 fn is_invalid(&self, index: usize) -> VortexResult<bool>;
124
125 fn all_valid(&self) -> VortexResult<bool>;
130
131 fn all_invalid(&self) -> VortexResult<bool>;
136
137 fn valid_count(&self) -> VortexResult<usize>;
139
140 fn invalid_count(&self) -> VortexResult<usize>;
142
143 fn validity(&self) -> VortexResult<Validity>;
145
146 fn validity_mask(&self) -> VortexResult<Mask>;
148
149 fn to_canonical(&self) -> VortexResult<Canonical>;
151
152 fn append_to_builder(
156 &self,
157 builder: &mut dyn ArrayBuilder,
158 ctx: &mut ExecutionCtx,
159 ) -> VortexResult<()>;
160
161 fn statistics(&self) -> StatsSetRef<'_>;
164
165 fn with_children(&self, children: Vec<ArrayRef>) -> VortexResult<ArrayRef>;
167}
168
169impl DynArray for Arc<dyn DynArray> {
170 #[inline]
171 fn as_any(&self) -> &dyn Any {
172 DynArray::as_any(self.as_ref())
173 }
174
175 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
176 self
177 }
178
179 #[inline]
180 fn to_array(&self) -> ArrayRef {
181 self.clone()
182 }
183
184 #[inline]
185 fn len(&self) -> usize {
186 self.as_ref().len()
187 }
188
189 #[inline]
190 fn dtype(&self) -> &DType {
191 self.as_ref().dtype()
192 }
193
194 fn vtable(&self) -> &dyn DynVTable {
195 self.as_ref().vtable()
196 }
197
198 #[inline]
199 fn encoding_id(&self) -> ArrayId {
200 self.as_ref().encoding_id()
201 }
202
203 #[inline]
204 fn slice(&self, range: Range<usize>) -> VortexResult<ArrayRef> {
205 self.as_ref().slice(range)
206 }
207
208 fn filter(&self, mask: Mask) -> VortexResult<ArrayRef> {
209 self.as_ref().filter(mask)
210 }
211
212 fn take(&self, indices: ArrayRef) -> VortexResult<ArrayRef> {
213 self.as_ref().take(indices)
214 }
215
216 #[inline]
217 fn scalar_at(&self, index: usize) -> VortexResult<Scalar> {
218 self.as_ref().scalar_at(index)
219 }
220
221 #[inline]
222 fn is_valid(&self, index: usize) -> VortexResult<bool> {
223 self.as_ref().is_valid(index)
224 }
225
226 #[inline]
227 fn is_invalid(&self, index: usize) -> VortexResult<bool> {
228 self.as_ref().is_invalid(index)
229 }
230
231 #[inline]
232 fn all_valid(&self) -> VortexResult<bool> {
233 self.as_ref().all_valid()
234 }
235
236 #[inline]
237 fn all_invalid(&self) -> VortexResult<bool> {
238 self.as_ref().all_invalid()
239 }
240
241 #[inline]
242 fn valid_count(&self) -> VortexResult<usize> {
243 self.as_ref().valid_count()
244 }
245
246 #[inline]
247 fn invalid_count(&self) -> VortexResult<usize> {
248 self.as_ref().invalid_count()
249 }
250
251 #[inline]
252 fn validity(&self) -> VortexResult<Validity> {
253 self.as_ref().validity()
254 }
255
256 #[inline]
257 fn validity_mask(&self) -> VortexResult<Mask> {
258 self.as_ref().validity_mask()
259 }
260
261 fn to_canonical(&self) -> VortexResult<Canonical> {
262 self.as_ref().to_canonical()
263 }
264
265 fn append_to_builder(
266 &self,
267 builder: &mut dyn ArrayBuilder,
268 ctx: &mut ExecutionCtx,
269 ) -> VortexResult<()> {
270 self.as_ref().append_to_builder(builder, ctx)
271 }
272
273 fn statistics(&self) -> StatsSetRef<'_> {
274 self.as_ref().statistics()
275 }
276
277 fn with_children(&self, children: Vec<ArrayRef>) -> VortexResult<ArrayRef> {
278 self.as_ref().with_children(children)
279 }
280}
281
282pub type ArrayRef = Arc<dyn DynArray>;
284
285impl ToOwned for dyn DynArray {
286 type Owned = ArrayRef;
287
288 fn to_owned(&self) -> Self::Owned {
289 self.to_array()
290 }
291}
292
293impl dyn DynArray + '_ {
294 pub fn is<M: Matcher>(&self) -> bool {
296 M::matches(self)
297 }
298
299 pub fn as_<M: Matcher>(&self) -> M::Match<'_> {
301 self.as_opt::<M>().vortex_expect("Failed to downcast")
302 }
303
304 pub fn as_opt<M: Matcher>(&self) -> Option<M::Match<'_>> {
306 M::try_match(self)
307 }
308
309 pub fn try_into<V: VTable>(self: Arc<Self>) -> Result<V::Array, Arc<Self>> {
311 match self.is::<V>() {
312 true => {
313 let arc = self
314 .as_any_arc()
315 .downcast::<ArrayAdapter<V>>()
316 .map_err(|_| vortex_err!("failed to downcast"))
317 .vortex_expect("Failed to downcast");
318 Ok(match Arc::try_unwrap(arc) {
319 Ok(array) => array.0,
320 Err(arc) => arc.deref().0.clone(),
321 })
322 }
323 false => Err(self),
324 }
325 }
326
327 pub fn as_constant(&self) -> Option<Scalar> {
328 self.as_opt::<Constant>().map(|a| a.scalar().clone())
329 }
330
331 pub fn nbytes(&self) -> u64 {
333 let mut nbytes = 0;
334 for array in self.depth_first_traversal() {
335 for buffer in array.buffers() {
336 nbytes += buffer.len() as u64;
337 }
338 }
339 nbytes
340 }
341
342 pub fn is_arrow(&self) -> bool {
344 self.is::<Null>()
345 || self.is::<Bool>()
346 || self.is::<Primitive>()
347 || self.is::<VarBin>()
348 || self.is::<VarBinView>()
349 }
350
351 pub fn is_canonical(&self) -> bool {
353 self.is::<AnyCanonical>()
354 }
355
356 pub fn with_child(&self, child_idx: usize, replacement: ArrayRef) -> VortexResult<ArrayRef> {
358 let mut children: Vec<ArrayRef> = self.children();
359 vortex_ensure!(
360 child_idx < children.len(),
361 "child index {} out of bounds for array with {} children",
362 child_idx,
363 children.len()
364 );
365 children[child_idx] = replacement;
366 self.with_children(children)
367 }
368}
369
370pub trait IntoArray {
372 fn into_array(self) -> ArrayRef;
373}
374
375impl IntoArray for ArrayRef {
376 fn into_array(self) -> ArrayRef {
377 self
378 }
379}
380
381mod private {
382 use super::*;
383
384 pub trait Sealed {}
385
386 impl<V: VTable> Sealed for ArrayAdapter<V> {}
387 impl Sealed for Arc<dyn DynArray> {}
388}
389
390#[repr(transparent)]
397pub struct ArrayAdapter<V: VTable>(V::Array);
398
399impl<V: VTable> ArrayAdapter<V> {
400 pub fn as_inner(&self) -> &V::Array {
402 &self.0
403 }
404
405 pub fn into_inner(self) -> V::Array {
407 self.0
408 }
409}
410
411impl<V: VTable> Debug for ArrayAdapter<V> {
412 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
413 self.0.fmt(f)
414 }
415}
416
417impl<V: VTable> ReduceNode for ArrayAdapter<V> {
418 fn as_any(&self) -> &dyn Any {
419 self
420 }
421
422 fn node_dtype(&self) -> VortexResult<DType> {
423 Ok(V::dtype(&self.0).clone())
424 }
425
426 fn scalar_fn(&self) -> Option<&ScalarFnRef> {
427 self.0.as_opt::<ScalarFnVTable>().map(|a| a.scalar_fn())
428 }
429
430 fn child(&self, idx: usize) -> ReduceNodeRef {
431 self.nth_child(idx)
432 .unwrap_or_else(|| vortex_panic!("Child index out of bounds: {}", idx))
433 }
434
435 fn child_count(&self) -> usize {
436 self.nchildren()
437 }
438}
439
440impl<V: VTable> DynArray for ArrayAdapter<V> {
441 fn as_any(&self) -> &dyn Any {
442 self
443 }
444
445 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
446 self
447 }
448
449 fn to_array(&self) -> ArrayRef {
450 Arc::new(ArrayAdapter::<V>(self.0.clone()))
451 }
452
453 fn len(&self) -> usize {
454 V::len(&self.0)
455 }
456
457 fn dtype(&self) -> &DType {
458 V::dtype(&self.0)
459 }
460
461 fn vtable(&self) -> &dyn DynVTable {
462 V::vtable(self.as_inner())
463 }
464
465 fn encoding_id(&self) -> ArrayId {
466 V::vtable(&self.0).id()
467 }
468
469 fn slice(&self, range: Range<usize>) -> VortexResult<ArrayRef> {
470 let start = range.start;
471 let stop = range.end;
472
473 if start == 0 && stop == self.len() {
474 return Ok(self.to_array());
475 }
476
477 vortex_ensure!(
478 start <= self.len(),
479 "OutOfBounds: start {start} > length {}",
480 self.len()
481 );
482 vortex_ensure!(
483 stop <= self.len(),
484 "OutOfBounds: stop {stop} > length {}",
485 self.len()
486 );
487
488 vortex_ensure!(start <= stop, "start ({start}) must be <= stop ({stop})");
489
490 if start == stop {
491 return Ok(Canonical::empty(self.dtype()).into_array());
492 }
493
494 let sliced = SliceArray::try_new(self.to_array(), range)?
495 .into_array()
496 .optimize()?;
497
498 if !sliced.is::<Constant>() {
500 self.statistics().with_iter(|iter| {
501 sliced.statistics().inherit(iter.filter(|(stat, value)| {
502 matches!(
503 stat,
504 Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted
505 ) && value.as_ref().as_exact().is_some_and(|v| {
506 Scalar::try_new(DType::Bool(Nullability::NonNullable), Some(v.clone()))
507 .vortex_expect("A stat that was expected to be a boolean stat was not")
508 .as_bool()
509 .value()
510 .unwrap_or_default()
511 })
512 }));
513 });
514 }
515
516 Ok(sliced)
517 }
518
519 fn filter(&self, mask: Mask) -> VortexResult<ArrayRef> {
520 FilterArray::try_new(self.to_array(), mask)?
521 .into_array()
522 .optimize()
523 }
524
525 fn take(&self, indices: ArrayRef) -> VortexResult<ArrayRef> {
526 DictArray::try_new(indices, self.to_array())?
527 .into_array()
528 .optimize()
529 }
530
531 fn scalar_at(&self, index: usize) -> VortexResult<Scalar> {
532 vortex_ensure!(index < self.len(), OutOfBounds: index, 0, self.len());
533 if self.is_invalid(index)? {
534 return Ok(Scalar::null(self.dtype().clone()));
535 }
536 let scalar = <V::OperationsVTable as OperationsVTable<V>>::scalar_at(&self.0, index)?;
537 vortex_ensure!(self.dtype() == scalar.dtype(), "Scalar dtype mismatch");
538 Ok(scalar)
539 }
540
541 fn is_valid(&self, index: usize) -> VortexResult<bool> {
542 vortex_ensure!(index < self.len(), OutOfBounds: index, 0, self.len());
543 match self.validity()? {
544 Validity::NonNullable | Validity::AllValid => Ok(true),
545 Validity::AllInvalid => Ok(false),
546 Validity::Array(a) => a
547 .scalar_at(index)?
548 .as_bool()
549 .value()
550 .ok_or_else(|| vortex_err!("validity value at index {} is null", index)),
551 }
552 }
553
554 fn is_invalid(&self, index: usize) -> VortexResult<bool> {
555 Ok(!self.is_valid(index)?)
556 }
557
558 fn all_valid(&self) -> VortexResult<bool> {
559 match self.validity()? {
560 Validity::NonNullable | Validity::AllValid => Ok(true),
561 Validity::AllInvalid => Ok(false),
562 Validity::Array(a) => Ok(a.statistics().compute_min::<bool>().unwrap_or(false)),
563 }
564 }
565
566 fn all_invalid(&self) -> VortexResult<bool> {
567 match self.validity()? {
568 Validity::NonNullable | Validity::AllValid => Ok(false),
569 Validity::AllInvalid => Ok(true),
570 Validity::Array(a) => Ok(!a.statistics().compute_max::<bool>().unwrap_or(true)),
571 }
572 }
573
574 fn valid_count(&self) -> VortexResult<usize> {
576 if let Some(Precision::Exact(invalid_count)) =
577 self.statistics().get_as::<usize>(Stat::NullCount)
578 {
579 return Ok(self.len() - invalid_count);
580 }
581
582 let count = match self.validity()? {
583 Validity::NonNullable | Validity::AllValid => self.len(),
584 Validity::AllInvalid => 0,
585 Validity::Array(a) => {
586 let mut ctx = LEGACY_SESSION.create_execution_ctx();
587 let array_sum = sum(&a, &mut ctx)?;
588 array_sum
589 .as_primitive()
590 .as_::<usize>()
591 .ok_or_else(|| vortex_err!("sum of validity array is null"))?
592 }
593 };
594 vortex_ensure!(count <= self.len(), "Valid count exceeds array length");
595
596 self.statistics()
597 .set(Stat::NullCount, Precision::exact(self.len() - count));
598
599 Ok(count)
600 }
601
602 fn invalid_count(&self) -> VortexResult<usize> {
603 Ok(self.len() - self.valid_count()?)
604 }
605
606 fn validity(&self) -> VortexResult<Validity> {
607 if self.dtype().is_nullable() {
608 let validity = <V::ValidityVTable as ValidityVTable<V>>::validity(&self.0)?;
609 if let Validity::Array(array) = &validity {
610 vortex_ensure!(array.len() == self.len(), "Validity array length mismatch");
611 vortex_ensure!(
612 matches!(array.dtype(), DType::Bool(Nullability::NonNullable)),
613 "Validity array is not non-nullable boolean: {}",
614 self.encoding_id(),
615 );
616 }
617 Ok(validity)
618 } else {
619 Ok(Validity::NonNullable)
620 }
621 }
622
623 fn validity_mask(&self) -> VortexResult<Mask> {
624 match self.validity()? {
625 Validity::NonNullable | Validity::AllValid => Ok(Mask::new_true(self.len())),
626 Validity::AllInvalid => Ok(Mask::new_false(self.len())),
627 Validity::Array(a) => Ok(a.to_bool().to_mask()),
628 }
629 }
630
631 fn to_canonical(&self) -> VortexResult<Canonical> {
632 self.to_array()
633 .execute(&mut LEGACY_SESSION.create_execution_ctx())
634 }
635
636 fn append_to_builder(
637 &self,
638 builder: &mut dyn ArrayBuilder,
639 ctx: &mut ExecutionCtx,
640 ) -> VortexResult<()> {
641 if builder.dtype() != self.dtype() {
642 vortex_panic!(
643 "Builder dtype mismatch: expected {}, got {}",
644 self.dtype(),
645 builder.dtype(),
646 );
647 }
648 let len = builder.len();
649
650 V::append_to_builder(&self.0, builder, ctx)?;
651
652 assert_eq!(
653 len + self.len(),
654 builder.len(),
655 "Builder length mismatch after writing array for encoding {}",
656 self.encoding_id(),
657 );
658 Ok(())
659 }
660
661 fn statistics(&self) -> StatsSetRef<'_> {
662 V::stats(&self.0)
663 }
664
665 fn with_children(&self, children: Vec<ArrayRef>) -> VortexResult<ArrayRef> {
666 let mut this = self.0.clone();
667 V::with_children(&mut this, children)?;
668 Ok(this.into_array())
669 }
670}
671
672impl<V: VTable> ArrayHash for ArrayAdapter<V> {
673 fn array_hash<H: Hasher>(&self, state: &mut H, precision: hash::Precision) {
674 self.0.encoding_id().hash(state);
675 V::array_hash(&self.0, state, precision);
676 }
677}
678
679impl<V: VTable> ArrayEq for ArrayAdapter<V> {
680 fn array_eq(&self, other: &Self, precision: hash::Precision) -> bool {
681 V::array_eq(&self.0, &other.0, precision)
682 }
683}
684
685impl<V: VTable> ArrayVisitor for ArrayAdapter<V> {
686 fn children(&self) -> Vec<ArrayRef> {
687 (0..V::nchildren(&self.0))
688 .map(|i| V::child(&self.0, i))
689 .collect()
690 }
691
692 fn nchildren(&self) -> usize {
693 V::nchildren(&self.0)
694 }
695
696 fn nth_child(&self, idx: usize) -> Option<ArrayRef> {
697 (idx < V::nchildren(&self.0)).then(|| V::child(&self.0, idx))
698 }
699
700 fn children_names(&self) -> Vec<String> {
701 (0..V::nchildren(&self.0))
702 .map(|i| V::child_name(&self.0, i))
703 .collect()
704 }
705
706 fn named_children(&self) -> Vec<(String, ArrayRef)> {
707 (0..V::nchildren(&self.0))
708 .map(|i| (V::child_name(&self.0, i), V::child(&self.0, i)))
709 .collect()
710 }
711
712 fn buffers(&self) -> Vec<ByteBuffer> {
713 (0..V::nbuffers(&self.0))
714 .map(|i| V::buffer(&self.0, i).to_host_sync())
715 .collect()
716 }
717
718 fn buffer_handles(&self) -> Vec<BufferHandle> {
719 (0..V::nbuffers(&self.0))
720 .map(|i| V::buffer(&self.0, i))
721 .collect()
722 }
723
724 fn buffer_names(&self) -> Vec<String> {
725 (0..V::nbuffers(&self.0))
726 .filter_map(|i| V::buffer_name(&self.0, i))
727 .collect()
728 }
729
730 fn named_buffers(&self) -> Vec<(String, BufferHandle)> {
731 (0..V::nbuffers(&self.0))
732 .filter_map(|i| V::buffer_name(&self.0, i).map(|name| (name, V::buffer(&self.0, i))))
733 .collect()
734 }
735
736 fn nbuffers(&self) -> usize {
737 V::nbuffers(&self.0)
738 }
739
740 fn metadata(&self) -> VortexResult<Option<Vec<u8>>> {
741 V::serialize(V::metadata(&self.0)?)
742 }
743
744 fn metadata_fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
745 match V::metadata(&self.0) {
746 Err(e) => write!(f, "<serde error: {e}>"),
747 Ok(metadata) => Debug::fmt(&metadata, f),
748 }
749 }
750
751 fn is_host(&self) -> bool {
752 for array in self.depth_first_traversal() {
753 if !array.buffer_handles().iter().all(BufferHandle::is_on_host) {
754 return false;
755 }
756 }
757
758 true
759 }
760}
761
762impl<V: VTable> Matcher for V {
764 type Match<'a> = &'a V::Array;
765
766 fn matches(array: &dyn DynArray) -> bool {
767 DynArray::as_any(array).is::<ArrayAdapter<V>>()
768 }
769
770 fn try_match<'a>(array: &'a dyn DynArray) -> Option<Self::Match<'a>> {
771 DynArray::as_any(array)
772 .downcast_ref::<ArrayAdapter<V>>()
773 .map(|array_adapter| &array_adapter.0)
774 }
775}