vortex_array/array/
operator.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use vortex_error::{VortexResult, vortex_panic};
7use vortex_mask::Mask;
8use vortex_vector::{Vector, VectorOps, vector_matches_dtype};
9
10use crate::execution::{BatchKernelRef, BindCtx, DummyExecutionCtx, ExecutionCtx};
11use crate::pipeline::PipelinedNode;
12use crate::pipeline::driver::PipelineDriver;
13use crate::vtable::{OperatorVTable, VTable};
14use crate::{Array, ArrayAdapter, ArrayRef};
15
16/// Array functions as provided by the `OperatorVTable`.
17///
18/// Note: the public functions such as "execute" should move onto the main `Array` trait when
19/// operators is stabilized. The other functions should remain on a `pub(crate)` trait.
20pub trait ArrayOperator: 'static + Send + Sync {
21    /// Execute the array's batch kernel with the given selection mask.
22    ///
23    /// # Panics
24    ///
25    /// If the mask length does not match the array length.
26    /// If the array's implementation returns an invalid vector (wrong length, wrong type, etc.).
27    fn execute_batch(&self, selection: &Mask, ctx: &mut dyn ExecutionCtx) -> VortexResult<Vector>;
28
29    /// Optimize the array by running the optimization rules.
30    fn reduce_children(&self) -> VortexResult<Option<ArrayRef>>;
31
32    /// Optimize the array by pushing down a parent array.
33    fn reduce_parent(&self, parent: &ArrayRef, child_idx: usize) -> VortexResult<Option<ArrayRef>>;
34
35    /// Returns the array as a pipeline node, if supported.
36    fn as_pipelined(&self) -> Option<&dyn PipelinedNode>;
37
38    /// Bind the array to a batch kernel. This is an internal function
39    fn bind(
40        &self,
41        selection: Option<&ArrayRef>,
42        ctx: &mut dyn BindCtx,
43    ) -> VortexResult<BatchKernelRef>;
44}
45
46impl ArrayOperator for Arc<dyn Array> {
47    fn execute_batch(&self, selection: &Mask, ctx: &mut dyn ExecutionCtx) -> VortexResult<Vector> {
48        self.as_ref().execute_batch(selection, ctx)
49    }
50
51    fn reduce_children(&self) -> VortexResult<Option<ArrayRef>> {
52        self.as_ref().reduce_children()
53    }
54
55    fn reduce_parent(&self, parent: &ArrayRef, child_idx: usize) -> VortexResult<Option<ArrayRef>> {
56        self.as_ref().reduce_parent(parent, child_idx)
57    }
58
59    fn as_pipelined(&self) -> Option<&dyn PipelinedNode> {
60        self.as_ref().as_pipelined()
61    }
62
63    fn bind(
64        &self,
65        selection: Option<&ArrayRef>,
66        ctx: &mut dyn BindCtx,
67    ) -> VortexResult<BatchKernelRef> {
68        self.as_ref().bind(selection, ctx)
69    }
70}
71
72impl<V: VTable> ArrayOperator for ArrayAdapter<V> {
73    fn execute_batch(&self, selection: &Mask, ctx: &mut dyn ExecutionCtx) -> VortexResult<Vector> {
74        let vector =
75            <V::OperatorVTable as OperatorVTable<V>>::execute_batch(&self.0, selection, ctx)?;
76
77        // Such a cheap check that we run it always. More expensive DType checks live in
78        // debug_assertions.
79        assert_eq!(
80            vector.len(),
81            selection.true_count(),
82            "Batch execution returned vector of incorrect length"
83        );
84
85        if cfg!(debug_assertions) {
86            // Checks for correct type and nullability.
87            if !vector_matches_dtype(&vector, self.dtype()) {
88                vortex_panic!(
89                    "Returned vector {:?} does not match expected dtype {}",
90                    vector,
91                    self.dtype()
92                );
93            }
94        }
95
96        Ok(vector)
97    }
98
99    fn reduce_children(&self) -> VortexResult<Option<ArrayRef>> {
100        <V::OperatorVTable as OperatorVTable<V>>::reduce_children(&self.0)
101    }
102
103    fn reduce_parent(&self, parent: &ArrayRef, child_idx: usize) -> VortexResult<Option<ArrayRef>> {
104        <V::OperatorVTable as OperatorVTable<V>>::reduce_parent(&self.0, parent, child_idx)
105    }
106
107    fn as_pipelined(&self) -> Option<&dyn PipelinedNode> {
108        <V::OperatorVTable as OperatorVTable<V>>::pipeline_node(&self.0)
109    }
110
111    fn bind(
112        &self,
113        selection: Option<&ArrayRef>,
114        ctx: &mut dyn BindCtx,
115    ) -> VortexResult<BatchKernelRef> {
116        <V::OperatorVTable as OperatorVTable<V>>::bind(&self.0, selection, ctx)
117    }
118}
119
120// TODO(ngates): create a smarter context in the future
121impl BindCtx for () {
122    fn bind(
123        &mut self,
124        array: &ArrayRef,
125        selection: Option<&ArrayRef>,
126    ) -> VortexResult<BatchKernelRef> {
127        array.bind(selection, self)
128    }
129}
130
131impl dyn Array + '_ {
132    pub fn execute(&self) -> VortexResult<Vector> {
133        self.execute_with_selection(&Mask::new_true(self.len()))
134    }
135
136    pub fn execute_with_selection(&self, selection: &Mask) -> VortexResult<Vector> {
137        assert_eq!(self.len(), selection.len());
138
139        // Check if the array is a pipeline node
140        if self.as_pipelined().is_some() {
141            return PipelineDriver::new(self.to_array()).execute(selection);
142        }
143
144        self.execute_batch(selection, &mut DummyExecutionCtx)
145    }
146}