Skip to main content

vortex_array/array/
erased.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::type_name;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::hash::Hasher;
8use std::ops::Range;
9use std::sync::Arc;
10
11use vortex_buffer::ByteBuffer;
12use vortex_error::VortexExpect;
13use vortex_error::VortexResult;
14use vortex_error::vortex_ensure;
15use vortex_error::vortex_err;
16use vortex_error::vortex_panic;
17use vortex_mask::Mask;
18
19use crate::AnyCanonical;
20use crate::Array;
21use crate::ArrayEq;
22use crate::ArrayHash;
23use crate::ArrayView;
24use crate::Canonical;
25use crate::ExecutionCtx;
26use crate::IntoArray;
27use crate::LEGACY_SESSION;
28use crate::VTable;
29use crate::VortexSessionExecute;
30use crate::aggregate_fn::fns::sum::sum;
31use crate::array::ArrayId;
32use crate::array::ArrayInner;
33use crate::array::DynArray;
34use crate::arrays::Bool;
35use crate::arrays::Constant;
36use crate::arrays::DictArray;
37use crate::arrays::FilterArray;
38use crate::arrays::Null;
39use crate::arrays::Primitive;
40use crate::arrays::SliceArray;
41use crate::arrays::VarBin;
42use crate::arrays::VarBinView;
43use crate::buffer::BufferHandle;
44use crate::builders::ArrayBuilder;
45use crate::dtype::DType;
46use crate::dtype::Nullability;
47use crate::expr::stats::Precision;
48use crate::expr::stats::Stat;
49use crate::expr::stats::StatsProviderExt;
50use crate::matcher::Matcher;
51use crate::optimizer::ArrayOptimizer;
52use crate::scalar::Scalar;
53use crate::stats::StatsSetRef;
54use crate::validity::Validity;
55
56/// A depth-first pre-order iterator over an Array.
57pub struct DepthFirstArrayIterator {
58    stack: Vec<ArrayRef>,
59}
60
61impl Iterator for DepthFirstArrayIterator {
62    type Item = ArrayRef;
63
64    fn next(&mut self) -> Option<Self::Item> {
65        let next = self.stack.pop()?;
66        for child in next.children().into_iter().rev() {
67            self.stack.push(child);
68        }
69        Some(next)
70    }
71}
72
73/// A reference-counted pointer to a type-erased array.
74#[derive(Clone)]
75pub struct ArrayRef(Arc<dyn DynArray>);
76
77impl ArrayRef {
78    /// Create from an `Arc<dyn DynArray>`.
79    pub(crate) fn from_inner(inner: Arc<dyn DynArray>) -> Self {
80        Self(inner)
81    }
82
83    /// Returns the Arc::as_ptr().addr() of the underlying array.
84    /// This function is used in a couple of places, and we should migrate them to using array_eq.
85    #[doc(hidden)]
86    pub fn addr(&self) -> usize {
87        Arc::as_ptr(&self.0).addr()
88    }
89
90    /// Returns a reference to the inner Arc.
91    #[inline(always)]
92    pub(crate) fn inner(&self) -> &Arc<dyn DynArray> {
93        &self.0
94    }
95
96    /// Consumes the array reference, returning the owned backing allocation.
97    #[inline(always)]
98    pub(crate) fn into_inner(self) -> Arc<dyn DynArray> {
99        self.0
100    }
101
102    /// Returns true if the two ArrayRefs point to the same allocation.
103    pub fn ptr_eq(this: &ArrayRef, other: &ArrayRef) -> bool {
104        Arc::ptr_eq(&this.0, &other.0)
105    }
106}
107
108impl Debug for ArrayRef {
109    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
110        Debug::fmt(&*self.0, f)
111    }
112}
113
114impl ArrayHash for ArrayRef {
115    fn array_hash<H: Hasher>(&self, state: &mut H, precision: crate::Precision) {
116        self.0.dyn_array_hash(state as &mut dyn Hasher, precision);
117    }
118}
119
120impl ArrayEq for ArrayRef {
121    fn array_eq(&self, other: &Self, precision: crate::Precision) -> bool {
122        self.0.dyn_array_eq(other, precision)
123    }
124}
125impl ArrayRef {
126    /// Returns the length of the array.
127    #[inline]
128    pub fn len(&self) -> usize {
129        self.0.len()
130    }
131
132    /// Returns whether the array is empty (has zero rows).
133    #[inline]
134    pub fn is_empty(&self) -> bool {
135        self.0.len() == 0
136    }
137
138    /// Returns the logical Vortex [`DType`] of the array.
139    #[inline]
140    pub fn dtype(&self) -> &DType {
141        self.0.dtype()
142    }
143
144    /// Returns the encoding ID of the array.
145    #[inline]
146    pub fn encoding_id(&self) -> ArrayId {
147        self.0.encoding_id()
148    }
149
150    /// Performs a constant-time slice of the array.
151    pub fn slice(&self, range: Range<usize>) -> VortexResult<ArrayRef> {
152        let len = self.len();
153        let start = range.start;
154        let stop = range.end;
155
156        if start == 0 && stop == len {
157            return Ok(self.clone());
158        }
159
160        vortex_ensure!(start <= len, "OutOfBounds: start {start} > length {}", len);
161        vortex_ensure!(stop <= len, "OutOfBounds: stop {stop} > length {}", len);
162
163        vortex_ensure!(start <= stop, "start ({start}) must be <= stop ({stop})");
164
165        if start == stop {
166            return Ok(Canonical::empty(self.dtype()).into_array());
167        }
168
169        let sliced = SliceArray::try_new(self.clone(), range)?
170            .into_array()
171            .optimize()?;
172
173        // Propagate some stats from the original array to the sliced array.
174        if !sliced.is::<Constant>() {
175            self.statistics().with_iter(|iter| {
176                sliced.statistics().inherit(iter.filter(|(stat, value)| {
177                    matches!(
178                        stat,
179                        Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted
180                    ) && value.as_ref().as_exact().is_some_and(|v| {
181                        Scalar::try_new(DType::Bool(Nullability::NonNullable), Some(v.clone()))
182                            .vortex_expect("A stat that was expected to be a boolean stat was not")
183                            .as_bool()
184                            .value()
185                            .unwrap_or_default()
186                    })
187                }));
188            });
189        }
190
191        Ok(sliced)
192    }
193
194    /// Wraps the array in a [`FilterArray`] such that it is logically filtered by the given mask.
195    pub fn filter(&self, mask: Mask) -> VortexResult<ArrayRef> {
196        FilterArray::try_new(self.clone(), mask)?
197            .into_array()
198            .optimize()
199    }
200
201    /// Wraps the array in a [`DictArray`] such that it is logically taken by the given indices.
202    pub fn take(&self, indices: ArrayRef) -> VortexResult<ArrayRef> {
203        DictArray::try_new(indices, self.clone())?
204            .into_array()
205            .optimize()
206    }
207
208    /// Fetch the scalar at the given index.
209    #[deprecated(
210        note = "Use `execute_scalar` instead, which allows passing an execution context for more \
211        efficient execution when fetching multiple scalars from the same array."
212    )]
213    pub fn scalar_at(&self, index: usize) -> VortexResult<Scalar> {
214        self.execute_scalar(index, &mut LEGACY_SESSION.create_execution_ctx())
215    }
216
217    /// Execute the array to extract a scalar at the given index.
218    pub fn execute_scalar(&self, index: usize, ctx: &mut ExecutionCtx) -> VortexResult<Scalar> {
219        vortex_ensure!(index < self.len(), OutOfBounds: index, 0, self.len());
220        if self.is_invalid(index, ctx)? {
221            return Ok(Scalar::null(self.dtype().clone()));
222        }
223        let scalar = self.0.execute_scalar(self, index, ctx)?;
224        vortex_ensure!(self.dtype() == scalar.dtype(), "Scalar dtype mismatch");
225        Ok(scalar)
226    }
227
228    /// Returns whether the item at `index` is valid.
229    pub fn is_valid(&self, index: usize, ctx: &mut ExecutionCtx) -> VortexResult<bool> {
230        vortex_ensure!(index < self.len(), OutOfBounds: index, 0, self.len());
231        match self.validity()? {
232            Validity::NonNullable | Validity::AllValid => Ok(true),
233            Validity::AllInvalid => Ok(false),
234            Validity::Array(a) => a
235                .execute_scalar(index, ctx)?
236                .as_bool()
237                .value()
238                .ok_or_else(|| vortex_err!("validity value at index {} is null", index)),
239        }
240    }
241
242    /// Returns whether the item at `index` is invalid.
243    pub fn is_invalid(&self, index: usize, ctx: &mut ExecutionCtx) -> VortexResult<bool> {
244        Ok(!self.is_valid(index, ctx)?)
245    }
246
247    /// Returns whether all items in the array are valid.
248    pub fn all_valid(&self, ctx: &mut ExecutionCtx) -> VortexResult<bool> {
249        match self.validity()? {
250            Validity::NonNullable | Validity::AllValid => Ok(true),
251            Validity::AllInvalid => Ok(false),
252            Validity::Array(a) => Ok(a.statistics().compute_min::<bool>(ctx).unwrap_or(false)),
253        }
254    }
255
256    /// Returns whether the array is all invalid.
257    pub fn all_invalid(&self, ctx: &mut ExecutionCtx) -> VortexResult<bool> {
258        match self.validity()? {
259            Validity::NonNullable | Validity::AllValid => Ok(false),
260            Validity::AllInvalid => Ok(true),
261            Validity::Array(a) => Ok(!a.statistics().compute_max::<bool>(ctx).unwrap_or(true)),
262        }
263    }
264
265    /// Returns the number of valid elements in the array.
266    pub fn valid_count(&self, ctx: &mut ExecutionCtx) -> VortexResult<usize> {
267        let len = self.len();
268        if let Some(Precision::Exact(invalid_count)) =
269            self.statistics().get_as::<usize>(Stat::NullCount)
270        {
271            return Ok(len - invalid_count);
272        }
273
274        let count = match self.validity()? {
275            Validity::NonNullable | Validity::AllValid => len,
276            Validity::AllInvalid => 0,
277            Validity::Array(a) => {
278                let array_sum = sum(&a, ctx)?;
279                array_sum
280                    .as_primitive()
281                    .as_::<usize>()
282                    .ok_or_else(|| vortex_err!("sum of validity array is null"))?
283            }
284        };
285        vortex_ensure!(count <= len, "Valid count exceeds array length");
286
287        self.statistics()
288            .set(Stat::NullCount, Precision::exact(len - count));
289
290        Ok(count)
291    }
292
293    /// Returns the number of invalid elements in the array.
294    pub fn invalid_count(&self, ctx: &mut ExecutionCtx) -> VortexResult<usize> {
295        Ok(self.len() - self.valid_count(ctx)?)
296    }
297
298    /// Returns the [`Validity`] of the array.
299    pub fn validity(&self) -> VortexResult<Validity> {
300        self.0.validity(self)
301    }
302
303    /// Returns the canonical representation of the array.
304    pub fn into_canonical(self) -> VortexResult<Canonical> {
305        self.execute(&mut LEGACY_SESSION.create_execution_ctx())
306    }
307
308    /// Returns the canonical representation of the array.
309    pub fn to_canonical(&self) -> VortexResult<Canonical> {
310        self.clone().into_canonical()
311    }
312
313    /// Writes the array into the canonical builder.
314    pub fn append_to_builder(
315        &self,
316        builder: &mut dyn ArrayBuilder,
317        ctx: &mut ExecutionCtx,
318    ) -> VortexResult<()> {
319        self.0.append_to_builder(self, builder, ctx)
320    }
321
322    /// Returns the statistics of the array.
323    pub fn statistics(&self) -> StatsSetRef<'_> {
324        self.0.statistics().to_ref(self)
325    }
326
327    /// Does the array match the given matcher.
328    pub fn is<M: Matcher>(&self) -> bool {
329        M::matches(self)
330    }
331
332    /// Returns the array downcast by the given matcher.
333    pub fn as_<M: Matcher>(&self) -> M::Match<'_> {
334        self.as_opt::<M>().vortex_expect("Failed to downcast")
335    }
336
337    /// Returns the array downcast by the given matcher.
338    pub fn as_opt<M: Matcher>(&self) -> Option<M::Match<'_>> {
339        M::try_match(self)
340    }
341
342    /// Returns the array downcast to the given `Array<V>` as an owned typed handle.
343    pub fn try_downcast<V: VTable>(self) -> Result<Array<V>, ArrayRef> {
344        Array::<V>::try_from_array_ref(self)
345    }
346
347    /// Returns the array downcast to the given `Array<V>` as an owned typed handle.
348    ///
349    /// # Panics
350    ///
351    /// Panics if the array is not of the given type.
352    pub fn downcast<V: VTable>(self) -> Array<V> {
353        Self::try_downcast(self)
354            .unwrap_or_else(|_| vortex_panic!("Failed to downcast to {}", type_name::<V>()))
355    }
356
357    /// Returns a reference to the typed `ArrayInner<V>` if this array matches the given vtable type.
358    pub fn as_typed<V: VTable>(&self) -> Option<ArrayView<'_, V>> {
359        let inner = self.0.as_any().downcast_ref::<ArrayInner<V>>()?;
360        Some(unsafe { ArrayView::new_unchecked(self, &inner.data) })
361    }
362
363    /// Returns the constant scalar if this is a constant array.
364    pub fn as_constant(&self) -> Option<Scalar> {
365        self.as_opt::<Constant>().map(|a| a.scalar().clone())
366    }
367
368    /// Total size of the array in bytes, including all children and buffers.
369    pub fn nbytes(&self) -> u64 {
370        let mut nbytes = 0;
371        for array in self.depth_first_traversal() {
372            for buffer in array.buffers() {
373                nbytes += buffer.len() as u64;
374            }
375        }
376        nbytes
377    }
378
379    /// Returns whether this array is an arrow encoding.
380    pub fn is_arrow(&self) -> bool {
381        self.is::<Null>()
382            || self.is::<Bool>()
383            || self.is::<Primitive>()
384            || self.is::<VarBin>()
385            || self.is::<VarBinView>()
386    }
387
388    /// Whether the array is of a canonical encoding.
389    pub fn is_canonical(&self) -> bool {
390        self.is::<AnyCanonical>()
391    }
392
393    /// Returns a new array with the slot at `slot_idx` replaced by `replacement`.
394    ///
395    /// This is only valid for physical rewrites: the replacement must have the same logical
396    /// `DType` and `len` as the existing slot.
397    ///
398    /// Takes ownership to allow in-place mutation when the refcount is 1.
399    pub fn with_slot(self, slot_idx: usize, replacement: ArrayRef) -> VortexResult<ArrayRef> {
400        let slots = self.slots().to_vec();
401        let nslots = slots.len();
402        vortex_ensure!(
403            slot_idx < nslots,
404            "slot index {} out of bounds for array with {} slots",
405            slot_idx,
406            nslots
407        );
408        let existing = slots[slot_idx]
409            .as_ref()
410            .vortex_expect("with_slot cannot replace an absent slot");
411        vortex_ensure!(
412            existing.dtype() == replacement.dtype(),
413            "slot {} dtype changed from {} to {} during physical rewrite",
414            slot_idx,
415            existing.dtype(),
416            replacement.dtype()
417        );
418        vortex_ensure!(
419            existing.len() == replacement.len(),
420            "slot {} len changed from {} to {} during physical rewrite",
421            slot_idx,
422            existing.len(),
423            replacement.len()
424        );
425        let mut slots = slots;
426        slots[slot_idx] = Some(replacement);
427        self.with_slots(slots)
428    }
429
430    /// Returns a new array with the provided slots.
431    ///
432    /// This is only valid for physical rewrites: slot count, presence, logical `DType`, and
433    /// logical `len` must remain unchanged.
434    pub fn with_slots(self, slots: Vec<Option<ArrayRef>>) -> VortexResult<ArrayRef> {
435        let old_slots = self.slots();
436        vortex_ensure!(
437            old_slots.len() == slots.len(),
438            "slot count changed from {} to {} during physical rewrite",
439            old_slots.len(),
440            slots.len()
441        );
442        for (idx, (old_slot, new_slot)) in old_slots.iter().zip(slots.iter()).enumerate() {
443            vortex_ensure!(
444                old_slot.is_some() == new_slot.is_some(),
445                "slot {} presence changed during physical rewrite",
446                idx
447            );
448            if let (Some(old_slot), Some(new_slot)) = (old_slot.as_ref(), new_slot.as_ref()) {
449                vortex_ensure!(
450                    old_slot.dtype() == new_slot.dtype(),
451                    "slot {} dtype changed from {} to {} during physical rewrite",
452                    idx,
453                    old_slot.dtype(),
454                    new_slot.dtype()
455                );
456                vortex_ensure!(
457                    old_slot.len() == new_slot.len(),
458                    "slot {} len changed from {} to {} during physical rewrite",
459                    idx,
460                    old_slot.len(),
461                    new_slot.len()
462                );
463            }
464        }
465        let inner = Arc::clone(&self.0);
466        inner.with_slots(self, slots)
467    }
468
469    pub fn reduce(&self) -> VortexResult<Option<ArrayRef>> {
470        self.0.reduce(self)
471    }
472
473    pub fn reduce_parent(
474        &self,
475        parent: &ArrayRef,
476        child_idx: usize,
477    ) -> VortexResult<Option<ArrayRef>> {
478        self.0.reduce_parent(self, parent, child_idx)
479    }
480
481    pub(crate) fn execute_encoding(
482        self,
483        ctx: &mut ExecutionCtx,
484    ) -> VortexResult<crate::ExecutionResult> {
485        let inner = Arc::clone(&self.0);
486        inner.execute(self, ctx)
487    }
488
489    pub fn execute_parent(
490        &self,
491        parent: &ArrayRef,
492        child_idx: usize,
493        ctx: &mut ExecutionCtx,
494    ) -> VortexResult<Option<ArrayRef>> {
495        self.0.execute_parent(self, parent, child_idx, ctx)
496    }
497
498    // ArrayVisitor delegation methods
499
500    /// Returns the children of the array.
501    pub fn children(&self) -> Vec<ArrayRef> {
502        self.0.children(self)
503    }
504
505    /// Returns the number of children of the array.
506    pub fn nchildren(&self) -> usize {
507        self.0.nchildren(self)
508    }
509
510    /// Returns the nth child of the array without allocating a Vec.
511    pub fn nth_child(&self, idx: usize) -> Option<ArrayRef> {
512        self.0.nth_child(self, idx)
513    }
514
515    /// Returns the names of the children of the array.
516    pub fn children_names(&self) -> Vec<String> {
517        self.0.children_names(self)
518    }
519
520    /// Returns the array's children with their names.
521    pub fn named_children(&self) -> Vec<(String, ArrayRef)> {
522        self.0.named_children(self)
523    }
524
525    /// Returns the data buffers of the array.
526    pub fn buffers(&self) -> Vec<ByteBuffer> {
527        self.0.buffers(self)
528    }
529
530    /// Returns the buffer handles of the array.
531    pub fn buffer_handles(&self) -> Vec<BufferHandle> {
532        self.0.buffer_handles(self)
533    }
534
535    /// Returns the names of the buffers of the array.
536    pub fn buffer_names(&self) -> Vec<String> {
537        self.0.buffer_names(self)
538    }
539
540    /// Returns the array's buffers with their names.
541    pub fn named_buffers(&self) -> Vec<(String, BufferHandle)> {
542        self.0.named_buffers(self)
543    }
544
545    /// Returns the number of data buffers of the array.
546    pub fn nbuffers(&self) -> usize {
547        self.0.nbuffers(self)
548    }
549
550    /// Returns the slots of the array.
551    pub fn slots(&self) -> &[Option<ArrayRef>] {
552        self.0.slots()
553    }
554
555    /// Returns the name of the slot at the given index.
556    pub fn slot_name(&self, idx: usize) -> String {
557        self.0.slot_name(self, idx)
558    }
559
560    /// Formats a human-readable metadata description.
561    pub fn metadata_fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
562        self.0.metadata_fmt(f)
563    }
564
565    /// Returns whether all buffers are host-resident.
566    pub fn is_host(&self) -> bool {
567        for array in self.depth_first_traversal() {
568            if !array.buffer_handles().iter().all(BufferHandle::is_on_host) {
569                return false;
570            }
571        }
572        true
573    }
574
575    // ArrayVisitorExt delegation methods
576
577    /// Count the number of buffers encoded by self and all child arrays.
578    pub fn nbuffers_recursive(&self) -> usize {
579        self.children()
580            .iter()
581            .map(|c| c.nbuffers_recursive())
582            .sum::<usize>()
583            + self.nbuffers()
584    }
585
586    /// Depth-first traversal of the array and its children.
587    pub fn depth_first_traversal(&self) -> DepthFirstArrayIterator {
588        DepthFirstArrayIterator {
589            stack: vec![self.clone()],
590        }
591    }
592}
593
594impl IntoArray for ArrayRef {
595    #[inline(always)]
596    fn into_array(self) -> ArrayRef {
597        self
598    }
599}
600
601impl<V: VTable> Matcher for V {
602    type Match<'a> = ArrayView<'a, V>;
603
604    fn matches(array: &ArrayRef) -> bool {
605        array.0.as_any().is::<ArrayInner<V>>()
606    }
607
608    fn try_match<'a>(array: &'a ArrayRef) -> Option<ArrayView<'a, V>> {
609        let inner = array.0.as_any().downcast_ref::<ArrayInner<V>>()?;
610        Some(unsafe { ArrayView::new_unchecked(array, &inner.data) })
611    }
612}