Skip to main content

vortex_array/array/
erased.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::type_name;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::hash::Hasher;
8use std::ops::Range;
9use std::sync::Arc;
10
11use vortex_buffer::ByteBuffer;
12use vortex_error::VortexExpect;
13use vortex_error::VortexResult;
14use vortex_error::vortex_ensure;
15use vortex_error::vortex_err;
16use vortex_error::vortex_panic;
17use vortex_mask::Mask;
18use vortex_session::VortexSession;
19
20use crate::AnyCanonical;
21use crate::Array;
22use crate::ArrayEq;
23use crate::ArrayHash;
24use crate::ArrayView;
25use crate::Canonical;
26use crate::ExecutionCtx;
27use crate::IntoArray;
28use crate::LEGACY_SESSION;
29use crate::ToCanonical;
30use crate::VTable;
31use crate::VortexSessionExecute;
32use crate::aggregate_fn::fns::sum::sum;
33use crate::array::ArrayId;
34use crate::array::ArrayInner;
35use crate::array::DynArray;
36use crate::arrays::Bool;
37use crate::arrays::Constant;
38use crate::arrays::DictArray;
39use crate::arrays::FilterArray;
40use crate::arrays::Null;
41use crate::arrays::Primitive;
42use crate::arrays::SliceArray;
43use crate::arrays::VarBin;
44use crate::arrays::VarBinView;
45use crate::arrays::bool::BoolArrayExt;
46use crate::buffer::BufferHandle;
47use crate::builders::ArrayBuilder;
48use crate::dtype::DType;
49use crate::dtype::Nullability;
50use crate::expr::stats::Precision;
51use crate::expr::stats::Stat;
52use crate::expr::stats::StatsProviderExt;
53use crate::matcher::Matcher;
54use crate::optimizer::ArrayOptimizer;
55use crate::scalar::Scalar;
56use crate::stats::StatsSetRef;
57use crate::validity::Validity;
58
59/// A depth-first pre-order iterator over an Array.
60pub struct DepthFirstArrayIterator {
61    stack: Vec<ArrayRef>,
62}
63
64impl Iterator for DepthFirstArrayIterator {
65    type Item = ArrayRef;
66
67    fn next(&mut self) -> Option<Self::Item> {
68        let next = self.stack.pop()?;
69        for child in next.children().into_iter().rev() {
70            self.stack.push(child);
71        }
72        Some(next)
73    }
74}
75
76/// A reference-counted pointer to a type-erased array.
77#[derive(Clone)]
78pub struct ArrayRef(Arc<dyn DynArray>);
79
80impl ArrayRef {
81    /// Create from an `Arc<dyn DynArray>`.
82    pub(crate) fn from_inner(inner: Arc<dyn DynArray>) -> Self {
83        Self(inner)
84    }
85
86    /// Returns the Arc::as_ptr().addr() of the underlying array.
87    /// This function is used in a couple of places, and we should migrate them to using array_eq.
88    #[doc(hidden)]
89    pub fn addr(&self) -> usize {
90        Arc::as_ptr(&self.0).addr()
91    }
92
93    /// Returns a reference to the inner Arc.
94    #[inline(always)]
95    pub(crate) fn inner(&self) -> &Arc<dyn DynArray> {
96        &self.0
97    }
98
99    /// Consumes the array reference, returning the owned backing allocation.
100    #[inline(always)]
101    pub(crate) fn into_inner(self) -> Arc<dyn DynArray> {
102        self.0
103    }
104
105    /// Returns true if the two ArrayRefs point to the same allocation.
106    pub fn ptr_eq(this: &ArrayRef, other: &ArrayRef) -> bool {
107        Arc::ptr_eq(&this.0, &other.0)
108    }
109}
110
111impl Debug for ArrayRef {
112    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
113        Debug::fmt(&*self.0, f)
114    }
115}
116
117impl ArrayHash for ArrayRef {
118    fn array_hash<H: Hasher>(&self, state: &mut H, precision: crate::Precision) {
119        self.0.dyn_array_hash(state as &mut dyn Hasher, precision);
120    }
121}
122
123impl ArrayEq for ArrayRef {
124    fn array_eq(&self, other: &Self, precision: crate::Precision) -> bool {
125        self.0.dyn_array_eq(other, precision)
126    }
127}
128
129#[allow(clippy::same_name_method)]
130impl ArrayRef {
131    /// Returns the length of the array.
132    #[inline]
133    pub fn len(&self) -> usize {
134        self.0.len()
135    }
136
137    /// Returns whether the array is empty (has zero rows).
138    #[inline]
139    pub fn is_empty(&self) -> bool {
140        self.0.len() == 0
141    }
142
143    /// Returns the logical Vortex [`DType`] of the array.
144    #[inline]
145    pub fn dtype(&self) -> &DType {
146        self.0.dtype()
147    }
148
149    /// Returns the encoding ID of the array.
150    #[inline]
151    pub fn encoding_id(&self) -> ArrayId {
152        self.0.encoding_id()
153    }
154
155    /// Performs a constant-time slice of the array.
156    pub fn slice(&self, range: Range<usize>) -> VortexResult<ArrayRef> {
157        let len = self.len();
158        let start = range.start;
159        let stop = range.end;
160
161        if start == 0 && stop == len {
162            return Ok(self.clone());
163        }
164
165        vortex_ensure!(start <= len, "OutOfBounds: start {start} > length {}", len);
166        vortex_ensure!(stop <= len, "OutOfBounds: stop {stop} > length {}", len);
167
168        vortex_ensure!(start <= stop, "start ({start}) must be <= stop ({stop})");
169
170        if start == stop {
171            return Ok(Canonical::empty(self.dtype()).into_array());
172        }
173
174        let sliced = SliceArray::try_new(self.clone(), range)?
175            .into_array()
176            .optimize()?;
177
178        // Propagate some stats from the original array to the sliced array.
179        if !sliced.is::<Constant>() {
180            self.statistics().with_iter(|iter| {
181                sliced.statistics().inherit(iter.filter(|(stat, value)| {
182                    matches!(
183                        stat,
184                        Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted
185                    ) && value.as_ref().as_exact().is_some_and(|v| {
186                        Scalar::try_new(DType::Bool(Nullability::NonNullable), Some(v.clone()))
187                            .vortex_expect("A stat that was expected to be a boolean stat was not")
188                            .as_bool()
189                            .value()
190                            .unwrap_or_default()
191                    })
192                }));
193            });
194        }
195
196        Ok(sliced)
197    }
198
199    /// Wraps the array in a [`FilterArray`] such that it is logically filtered by the given mask.
200    pub fn filter(&self, mask: Mask) -> VortexResult<ArrayRef> {
201        FilterArray::try_new(self.clone(), mask)?
202            .into_array()
203            .optimize()
204    }
205
206    /// Wraps the array in a [`DictArray`] such that it is logically taken by the given indices.
207    pub fn take(&self, indices: ArrayRef) -> VortexResult<ArrayRef> {
208        DictArray::try_new(indices, self.clone())?
209            .into_array()
210            .optimize()
211    }
212
213    /// Fetch the scalar at the given index.
214    pub fn scalar_at(&self, index: usize) -> VortexResult<Scalar> {
215        vortex_ensure!(index < self.len(), OutOfBounds: index, 0, self.len());
216        if self.is_invalid(index)? {
217            return Ok(Scalar::null(self.dtype().clone()));
218        }
219        let scalar = self.0.scalar_at(self, index)?;
220        vortex_ensure!(self.dtype() == scalar.dtype(), "Scalar dtype mismatch");
221        Ok(scalar)
222    }
223
224    /// Returns whether the item at `index` is valid.
225    pub fn is_valid(&self, index: usize) -> VortexResult<bool> {
226        vortex_ensure!(index < self.len(), OutOfBounds: index, 0, self.len());
227        match self.validity()? {
228            Validity::NonNullable | Validity::AllValid => Ok(true),
229            Validity::AllInvalid => Ok(false),
230            Validity::Array(a) => a
231                .scalar_at(index)?
232                .as_bool()
233                .value()
234                .ok_or_else(|| vortex_err!("validity value at index {} is null", index)),
235        }
236    }
237
238    /// Returns whether the item at `index` is invalid.
239    pub fn is_invalid(&self, index: usize) -> VortexResult<bool> {
240        Ok(!self.is_valid(index)?)
241    }
242
243    /// Returns whether all items in the array are valid.
244    pub fn all_valid(&self) -> VortexResult<bool> {
245        match self.validity()? {
246            Validity::NonNullable | Validity::AllValid => Ok(true),
247            Validity::AllInvalid => Ok(false),
248            Validity::Array(a) => Ok(a.statistics().compute_min::<bool>().unwrap_or(false)),
249        }
250    }
251
252    /// Returns whether the array is all invalid.
253    pub fn all_invalid(&self) -> VortexResult<bool> {
254        match self.validity()? {
255            Validity::NonNullable | Validity::AllValid => Ok(false),
256            Validity::AllInvalid => Ok(true),
257            Validity::Array(a) => Ok(!a.statistics().compute_max::<bool>().unwrap_or(true)),
258        }
259    }
260
261    /// Returns the number of valid elements in the array.
262    pub fn valid_count(&self) -> VortexResult<usize> {
263        let len = self.len();
264        if let Some(Precision::Exact(invalid_count)) =
265            self.statistics().get_as::<usize>(Stat::NullCount)
266        {
267            return Ok(len - invalid_count);
268        }
269
270        let count = match self.validity()? {
271            Validity::NonNullable | Validity::AllValid => len,
272            Validity::AllInvalid => 0,
273            Validity::Array(a) => {
274                let mut ctx = LEGACY_SESSION.create_execution_ctx();
275                let array_sum = sum(&a, &mut ctx)?;
276                array_sum
277                    .as_primitive()
278                    .as_::<usize>()
279                    .ok_or_else(|| vortex_err!("sum of validity array is null"))?
280            }
281        };
282        vortex_ensure!(count <= len, "Valid count exceeds array length");
283
284        self.statistics()
285            .set(Stat::NullCount, Precision::exact(len - count));
286
287        Ok(count)
288    }
289
290    /// Returns the number of invalid elements in the array.
291    pub fn invalid_count(&self) -> VortexResult<usize> {
292        Ok(self.len() - self.valid_count()?)
293    }
294
295    /// Returns the [`Validity`] of the array.
296    pub fn validity(&self) -> VortexResult<Validity> {
297        self.0.validity(self)
298    }
299
300    /// Returns the canonical validity mask for the array.
301    pub fn validity_mask(&self) -> VortexResult<Mask> {
302        match self.validity()? {
303            Validity::NonNullable | Validity::AllValid => Ok(Mask::new_true(self.len())),
304            Validity::AllInvalid => Ok(Mask::new_false(self.len())),
305            Validity::Array(a) => Ok(a.to_bool().to_mask()),
306        }
307    }
308
309    /// Returns the canonical representation of the array.
310    pub fn into_canonical(self) -> VortexResult<Canonical> {
311        self.execute(&mut LEGACY_SESSION.create_execution_ctx())
312    }
313
314    /// Returns the canonical representation of the array.
315    pub fn to_canonical(&self) -> VortexResult<Canonical> {
316        self.clone().into_canonical()
317    }
318
319    /// Writes the array into the canonical builder.
320    pub fn append_to_builder(
321        &self,
322        builder: &mut dyn ArrayBuilder,
323        ctx: &mut ExecutionCtx,
324    ) -> VortexResult<()> {
325        self.0.append_to_builder(self, builder, ctx)
326    }
327
328    /// Returns the statistics of the array.
329    pub fn statistics(&self) -> StatsSetRef<'_> {
330        self.0.statistics().to_ref(self)
331    }
332
333    /// Does the array match the given matcher.
334    pub fn is<M: Matcher>(&self) -> bool {
335        M::matches(self)
336    }
337
338    /// Returns the array downcast by the given matcher.
339    pub fn as_<M: Matcher>(&self) -> M::Match<'_> {
340        self.as_opt::<M>().vortex_expect("Failed to downcast")
341    }
342
343    /// Returns the array downcast by the given matcher.
344    pub fn as_opt<M: Matcher>(&self) -> Option<M::Match<'_>> {
345        M::try_match(self)
346    }
347
348    /// Returns the array downcast to the given `Array<V>` as an owned typed handle.
349    pub fn try_downcast<V: VTable>(self) -> Result<Array<V>, ArrayRef> {
350        Array::<V>::try_from_array_ref(self)
351    }
352
353    /// Returns the array downcast to the given `Array<V>` as an owned typed handle.
354    ///
355    /// # Panics
356    ///
357    /// Panics if the array is not of the given type.
358    pub fn downcast<V: VTable>(self) -> Array<V> {
359        Self::try_downcast(self)
360            .unwrap_or_else(|_| vortex_panic!("Failed to downcast to {}", type_name::<V>()))
361    }
362
363    /// Returns a reference to the typed `ArrayInner<V>` if this array matches the given vtable type.
364    pub fn as_typed<V: VTable>(&self) -> Option<ArrayView<'_, V>> {
365        let inner = self.0.as_any().downcast_ref::<ArrayInner<V>>()?;
366        Some(unsafe { ArrayView::new_unchecked(self, &inner.data) })
367    }
368
369    /// Returns the constant scalar if this is a constant array.
370    pub fn as_constant(&self) -> Option<Scalar> {
371        self.as_opt::<Constant>().map(|a| a.scalar().clone())
372    }
373
374    /// Total size of the array in bytes, including all children and buffers.
375    pub fn nbytes(&self) -> u64 {
376        let mut nbytes = 0;
377        for array in self.depth_first_traversal() {
378            for buffer in array.buffers() {
379                nbytes += buffer.len() as u64;
380            }
381        }
382        nbytes
383    }
384
385    /// Returns whether this array is an arrow encoding.
386    pub fn is_arrow(&self) -> bool {
387        self.is::<Null>()
388            || self.is::<Bool>()
389            || self.is::<Primitive>()
390            || self.is::<VarBin>()
391            || self.is::<VarBinView>()
392    }
393
394    /// Whether the array is of a canonical encoding.
395    pub fn is_canonical(&self) -> bool {
396        self.is::<AnyCanonical>()
397    }
398
399    /// Returns a new array with the slot at `slot_idx` replaced by `replacement`.
400    ///
401    /// This is only valid for physical rewrites: the replacement must have the same logical
402    /// `DType` and `len` as the existing slot.
403    ///
404    /// Takes ownership to allow in-place mutation when the refcount is 1.
405    pub fn with_slot(self, slot_idx: usize, replacement: ArrayRef) -> VortexResult<ArrayRef> {
406        let slots = self.slots().to_vec();
407        let nslots = slots.len();
408        vortex_ensure!(
409            slot_idx < nslots,
410            "slot index {} out of bounds for array with {} slots",
411            slot_idx,
412            nslots
413        );
414        let existing = slots[slot_idx]
415            .as_ref()
416            .vortex_expect("with_slot cannot replace an absent slot");
417        vortex_ensure!(
418            existing.dtype() == replacement.dtype(),
419            "slot {} dtype changed from {} to {} during physical rewrite",
420            slot_idx,
421            existing.dtype(),
422            replacement.dtype()
423        );
424        vortex_ensure!(
425            existing.len() == replacement.len(),
426            "slot {} len changed from {} to {} during physical rewrite",
427            slot_idx,
428            existing.len(),
429            replacement.len()
430        );
431        let mut slots = slots;
432        slots[slot_idx] = Some(replacement);
433        self.with_slots(slots)
434    }
435
436    /// Returns a new array with the provided slots.
437    ///
438    /// This is only valid for physical rewrites: slot count, presence, logical `DType`, and
439    /// logical `len` must remain unchanged.
440    pub fn with_slots(self, slots: Vec<Option<ArrayRef>>) -> VortexResult<ArrayRef> {
441        let old_slots = self.slots();
442        vortex_ensure!(
443            old_slots.len() == slots.len(),
444            "slot count changed from {} to {} during physical rewrite",
445            old_slots.len(),
446            slots.len()
447        );
448        for (idx, (old_slot, new_slot)) in old_slots.iter().zip(slots.iter()).enumerate() {
449            vortex_ensure!(
450                old_slot.is_some() == new_slot.is_some(),
451                "slot {} presence changed during physical rewrite",
452                idx
453            );
454            if let (Some(old_slot), Some(new_slot)) = (old_slot.as_ref(), new_slot.as_ref()) {
455                vortex_ensure!(
456                    old_slot.dtype() == new_slot.dtype(),
457                    "slot {} dtype changed from {} to {} during physical rewrite",
458                    idx,
459                    old_slot.dtype(),
460                    new_slot.dtype()
461                );
462                vortex_ensure!(
463                    old_slot.len() == new_slot.len(),
464                    "slot {} len changed from {} to {} during physical rewrite",
465                    idx,
466                    old_slot.len(),
467                    new_slot.len()
468                );
469            }
470        }
471        let inner = Arc::clone(&self.0);
472        inner.with_slots(self, slots)
473    }
474
475    pub fn reduce(&self) -> VortexResult<Option<ArrayRef>> {
476        self.0.reduce(self)
477    }
478
479    pub fn reduce_parent(
480        &self,
481        parent: &ArrayRef,
482        child_idx: usize,
483    ) -> VortexResult<Option<ArrayRef>> {
484        self.0.reduce_parent(self, parent, child_idx)
485    }
486
487    pub(crate) fn execute_encoding(
488        self,
489        ctx: &mut ExecutionCtx,
490    ) -> VortexResult<crate::ExecutionResult> {
491        let inner = Arc::clone(&self.0);
492        inner.execute(self, ctx)
493    }
494
495    pub fn execute_parent(
496        &self,
497        parent: &ArrayRef,
498        child_idx: usize,
499        ctx: &mut ExecutionCtx,
500    ) -> VortexResult<Option<ArrayRef>> {
501        self.0.execute_parent(self, parent, child_idx, ctx)
502    }
503
504    // ArrayVisitor delegation methods
505
506    /// Returns the children of the array.
507    pub fn children(&self) -> Vec<ArrayRef> {
508        self.0.children(self)
509    }
510
511    /// Returns the number of children of the array.
512    pub fn nchildren(&self) -> usize {
513        self.0.nchildren(self)
514    }
515
516    /// Returns the nth child of the array without allocating a Vec.
517    pub fn nth_child(&self, idx: usize) -> Option<ArrayRef> {
518        self.0.nth_child(self, idx)
519    }
520
521    /// Returns the names of the children of the array.
522    pub fn children_names(&self) -> Vec<String> {
523        self.0.children_names(self)
524    }
525
526    /// Returns the array's children with their names.
527    pub fn named_children(&self) -> Vec<(String, ArrayRef)> {
528        self.0.named_children(self)
529    }
530
531    /// Returns the data buffers of the array.
532    pub fn buffers(&self) -> Vec<ByteBuffer> {
533        self.0.buffers(self)
534    }
535
536    /// Returns the buffer handles of the array.
537    pub fn buffer_handles(&self) -> Vec<BufferHandle> {
538        self.0.buffer_handles(self)
539    }
540
541    /// Returns the names of the buffers of the array.
542    pub fn buffer_names(&self) -> Vec<String> {
543        self.0.buffer_names(self)
544    }
545
546    /// Returns the array's buffers with their names.
547    pub fn named_buffers(&self) -> Vec<(String, BufferHandle)> {
548        self.0.named_buffers(self)
549    }
550
551    /// Returns the number of data buffers of the array.
552    pub fn nbuffers(&self) -> usize {
553        self.0.nbuffers(self)
554    }
555
556    /// Returns the slots of the array.
557    pub fn slots(&self) -> &[Option<ArrayRef>] {
558        self.0.slots()
559    }
560
561    /// Returns the name of the slot at the given index.
562    pub fn slot_name(&self, idx: usize) -> String {
563        self.0.slot_name(self, idx)
564    }
565
566    /// Returns the serialized metadata of the array.
567    pub fn metadata(&self, session: &VortexSession) -> VortexResult<Option<Vec<u8>>> {
568        self.0.metadata(self, session)
569    }
570
571    /// Formats a human-readable metadata description.
572    pub fn metadata_fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
573        self.0.metadata_fmt(f)
574    }
575
576    /// Returns whether all buffers are host-resident.
577    pub fn is_host(&self) -> bool {
578        for array in self.depth_first_traversal() {
579            if !array.buffer_handles().iter().all(BufferHandle::is_on_host) {
580                return false;
581            }
582        }
583        true
584    }
585
586    // ArrayVisitorExt delegation methods
587
588    /// Count the number of buffers encoded by self and all child arrays.
589    pub fn nbuffers_recursive(&self) -> usize {
590        self.children()
591            .iter()
592            .map(|c| c.nbuffers_recursive())
593            .sum::<usize>()
594            + self.nbuffers()
595    }
596
597    /// Depth-first traversal of the array and its children.
598    pub fn depth_first_traversal(&self) -> DepthFirstArrayIterator {
599        DepthFirstArrayIterator {
600            stack: vec![self.clone()],
601        }
602    }
603}
604
605impl IntoArray for ArrayRef {
606    #[inline(always)]
607    fn into_array(self) -> ArrayRef {
608        self
609    }
610}
611
612impl<V: VTable> Matcher for V {
613    type Match<'a> = ArrayView<'a, V>;
614
615    fn matches(array: &ArrayRef) -> bool {
616        array.0.as_any().is::<ArrayInner<V>>()
617    }
618
619    fn try_match<'a>(array: &'a ArrayRef) -> Option<ArrayView<'a, V>> {
620        let inner = array.0.as_any().downcast_ref::<ArrayInner<V>>()?;
621        Some(unsafe { ArrayView::new_unchecked(array, &inner.data) })
622    }
623}