vortex_array/array/
operator.rs1use std::sync::Arc;
5
6use vortex_error::{VortexResult, vortex_panic};
7use vortex_mask::Mask;
8use vortex_vector::{Vector, VectorOps, vector_matches_dtype};
9
10use crate::execution::{BatchKernelRef, BindCtx, DummyExecutionCtx, ExecutionCtx};
11use crate::pipeline::PipelinedNode;
12use crate::pipeline::driver::PipelineDriver;
13use crate::vtable::{OperatorVTable, VTable};
14use crate::{Array, ArrayAdapter, ArrayRef};
15
16pub trait ArrayOperator: 'static + Send + Sync {
21 fn execute_batch(&self, selection: &Mask, ctx: &mut dyn ExecutionCtx) -> VortexResult<Vector>;
28
29 fn reduce_children(&self) -> VortexResult<Option<ArrayRef>>;
31
32 fn reduce_parent(&self, parent: &ArrayRef, child_idx: usize) -> VortexResult<Option<ArrayRef>>;
34
35 fn as_pipelined(&self) -> Option<&dyn PipelinedNode>;
37
38 fn bind(
40 &self,
41 selection: Option<&ArrayRef>,
42 ctx: &mut dyn BindCtx,
43 ) -> VortexResult<BatchKernelRef>;
44}
45
46impl ArrayOperator for Arc<dyn Array> {
47 fn execute_batch(&self, selection: &Mask, ctx: &mut dyn ExecutionCtx) -> VortexResult<Vector> {
48 self.as_ref().execute_batch(selection, ctx)
49 }
50
51 fn reduce_children(&self) -> VortexResult<Option<ArrayRef>> {
52 self.as_ref().reduce_children()
53 }
54
55 fn reduce_parent(&self, parent: &ArrayRef, child_idx: usize) -> VortexResult<Option<ArrayRef>> {
56 self.as_ref().reduce_parent(parent, child_idx)
57 }
58
59 fn as_pipelined(&self) -> Option<&dyn PipelinedNode> {
60 self.as_ref().as_pipelined()
61 }
62
63 fn bind(
64 &self,
65 selection: Option<&ArrayRef>,
66 ctx: &mut dyn BindCtx,
67 ) -> VortexResult<BatchKernelRef> {
68 self.as_ref().bind(selection, ctx)
69 }
70}
71
72impl<V: VTable> ArrayOperator for ArrayAdapter<V> {
73 fn execute_batch(&self, selection: &Mask, ctx: &mut dyn ExecutionCtx) -> VortexResult<Vector> {
74 let vector =
75 <V::OperatorVTable as OperatorVTable<V>>::execute_batch(&self.0, selection, ctx)?;
76
77 assert_eq!(
80 vector.len(),
81 selection.true_count(),
82 "Batch execution returned vector of incorrect length"
83 );
84
85 if cfg!(debug_assertions) {
86 if !vector_matches_dtype(&vector, self.dtype()) {
88 vortex_panic!(
89 "Returned vector {:?} does not match expected dtype {}",
90 vector,
91 self.dtype()
92 );
93 }
94 }
95
96 Ok(vector)
97 }
98
99 fn reduce_children(&self) -> VortexResult<Option<ArrayRef>> {
100 <V::OperatorVTable as OperatorVTable<V>>::reduce_children(&self.0)
101 }
102
103 fn reduce_parent(&self, parent: &ArrayRef, child_idx: usize) -> VortexResult<Option<ArrayRef>> {
104 <V::OperatorVTable as OperatorVTable<V>>::reduce_parent(&self.0, parent, child_idx)
105 }
106
107 fn as_pipelined(&self) -> Option<&dyn PipelinedNode> {
108 <V::OperatorVTable as OperatorVTable<V>>::pipeline_node(&self.0)
109 }
110
111 fn bind(
112 &self,
113 selection: Option<&ArrayRef>,
114 ctx: &mut dyn BindCtx,
115 ) -> VortexResult<BatchKernelRef> {
116 <V::OperatorVTable as OperatorVTable<V>>::bind(&self.0, selection, ctx)
117 }
118}
119
120impl BindCtx for () {
122 fn bind(
123 &mut self,
124 array: &ArrayRef,
125 selection: Option<&ArrayRef>,
126 ) -> VortexResult<BatchKernelRef> {
127 array.bind(selection, self)
128 }
129}
130
131impl dyn Array + '_ {
132 pub fn execute(&self) -> VortexResult<Vector> {
133 self.execute_with_selection(&Mask::new_true(self.len()))
134 }
135
136 pub fn execute_with_selection(&self, selection: &Mask) -> VortexResult<Vector> {
137 assert_eq!(self.len(), selection.len());
138
139 if self.as_pipelined().is_some() {
141 return PipelineDriver::new(self.to_array()).execute(selection);
142 }
143
144 self.execute_batch(selection, &mut DummyExecutionCtx)
145 }
146}