vortex_sequence/
operator.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::hash::{Hash, Hasher};
6use std::sync::Arc;
7
8use num_traits::{ConstOne, PrimInt};
9use vortex_array::Array;
10use vortex_array::operator::slice::SliceOperator;
11use vortex_array::operator::{
12    LengthBounds, Operator, OperatorEq, OperatorHash, OperatorId, OperatorRef,
13};
14use vortex_array::pipeline::bits::BitView;
15use vortex_array::pipeline::vec::Selection;
16use vortex_array::pipeline::view::ViewMut;
17use vortex_array::pipeline::{
18    BindContext, Element, Kernel, KernelContext, N, PipelinedOperator, RowSelection,
19};
20use vortex_array::vtable::PipelineVTable;
21use vortex_dtype::{DType, IntegerPType, NativePType, match_each_integer_ptype};
22use vortex_error::{VortexResult, vortex_err};
23
24use crate::{SequenceArray, SequenceVTable};
25
26impl PipelineVTable<SequenceVTable> for SequenceVTable {
27    fn to_operator(array: &SequenceArray) -> VortexResult<Option<OperatorRef>> {
28        Ok(Some(Arc::new(array.clone())))
29    }
30}
31
32impl OperatorHash for SequenceArray {
33    fn operator_hash<H: Hasher>(&self, state: &mut H) {
34        self.base().hash(state);
35        self.multiplier().hash(state);
36        self.dtype().hash(state);
37        self.bounds().hash(state);
38    }
39}
40
41impl OperatorEq for SequenceArray {
42    fn operator_eq(&self, other: &Self) -> bool {
43        self.base() == other.base()
44            && self.multiplier() == other.multiplier()
45            && self.dtype() == other.dtype()
46            && self.bounds() == other.bounds()
47    }
48}
49
50impl Operator for SequenceArray {
51    fn id(&self) -> OperatorId {
52        self.encoding_id()
53    }
54
55    fn as_any(&self) -> &dyn Any {
56        self
57    }
58
59    fn dtype(&self) -> &DType {
60        Array::dtype(self.as_ref())
61    }
62
63    fn bounds(&self) -> LengthBounds {
64        Array::len(self.as_ref()).into()
65    }
66
67    fn children(&self) -> &[OperatorRef] {
68        &[]
69    }
70
71    fn with_children(self: Arc<Self>, _children: Vec<OperatorRef>) -> VortexResult<OperatorRef> {
72        Ok(self)
73    }
74
75    fn reduce_parent(
76        &self,
77        parent: OperatorRef,
78        _child_idx: usize,
79    ) -> VortexResult<Option<OperatorRef>> {
80        // Push down slice
81        if let Some(slice) = parent.as_any().downcast_ref::<SliceOperator>() {
82            let range = slice.range();
83            return Ok(Some(Arc::new(SequenceArray::unchecked_new(
84                self.index_value(range.start),
85                self.multiplier(),
86                self.ptype(),
87                self.dtype().nullability(),
88                range.len(),
89            ))));
90        }
91
92        Ok(None)
93    }
94
95    fn as_pipelined(&self) -> Option<&dyn PipelinedOperator> {
96        Some(self)
97    }
98}
99
100impl PipelinedOperator for SequenceArray {
101    fn row_selection(&self) -> RowSelection {
102        RowSelection::Domain(self.as_ref().len())
103    }
104
105    fn bind(&self, _ctx: &dyn BindContext) -> VortexResult<Box<dyn Kernel>> {
106        Ok(match_each_integer_ptype!(self.ptype(), |T| {
107            if self.multiplier().as_primitive::<T>() == <T as ConstOne>::ONE {
108                Box::new(SequenceKernel::<T> {
109                    base: self.base().as_primitive::<T>(),
110                    len: Array::len(self.as_ref()),
111                })
112            } else {
113                Box::new(MultiplierSequenceKernel::<T> {
114                    base: self.base().as_primitive::<T>(),
115                    multiplier: self.multiplier().as_primitive::<T>(),
116                    len: Array::len(self.as_ref()),
117                })
118            }
119        }))
120    }
121
122    fn vector_children(&self) -> Vec<usize> {
123        vec![]
124    }
125
126    fn batch_children(&self) -> Vec<usize> {
127        vec![]
128    }
129}
130
131struct SequenceKernel<T> {
132    base: T,
133    len: usize,
134}
135
136impl<T: Element + IntegerPType> Kernel for SequenceKernel<T> {
137    fn step(
138        &self,
139        _ctx: &KernelContext,
140        step_idx: usize,
141        selection: &BitView,
142        out: &mut ViewMut,
143    ) -> VortexResult<()> {
144        // TODO(ngates): benchmark and optimize this
145        let values = out.as_array_mut::<T>();
146        let offset = step_idx * N;
147
148        // Check if we're in the final chunk to avoid overflow
149        if (offset + N) > self.len {
150            selection.try_iter_ones(|i| {
151                values[i] = self.base
152                    + T::from_usize(offset + i)
153                        .ok_or_else(|| vortex_err!("Overflow converting usize to ptype"))?;
154                Ok(())
155            })?;
156        } else {
157            for i in 0..N {
158                values[i] = self.base
159                    + T::from_usize(offset + i)
160                        .ok_or_else(|| vortex_err!("Overflow converting usize to ptype"))?;
161            }
162        }
163
164        match selection.true_count() {
165            0 | N => out.set_selection(Selection::Prefix),
166            _ => out.set_selection(Selection::Mask),
167        }
168
169        Ok(())
170    }
171}
172
173struct MultiplierSequenceKernel<T> {
174    base: T,
175    multiplier: T,
176    len: usize,
177}
178
179impl<T: Element + NativePType + PrimInt> Kernel for MultiplierSequenceKernel<T> {
180    fn step(
181        &self,
182        _ctx: &KernelContext,
183        chunk_idx: usize,
184        selection: &BitView,
185        out: &mut ViewMut,
186    ) -> VortexResult<()> {
187        // TODO(ngates): benchmark and optimize this. We should use addition not multiplication
188        let values = out.as_array_mut::<T>();
189        let offset = chunk_idx * N;
190
191        if (offset + N) > self.len {
192            selection.try_iter_ones(|i| {
193                values[i] = self.base
194                    + self
195                        .multiplier
196                        .checked_mul(
197                            &T::from_usize(offset + i)
198                                .ok_or_else(|| vortex_err!("Overflow converting usize to ptype"))?,
199                        )
200                        .ok_or_else(|| vortex_err!("Overflow computing sequence value"))?;
201                Ok(())
202            })?;
203        } else {
204            for i in 0..N {
205                values[i] = self.base
206                    + self
207                        .multiplier
208                        .checked_mul(
209                            &T::from_usize(offset + i)
210                                .ok_or_else(|| vortex_err!("Overflow converting usize to ptype"))?,
211                        )
212                        .ok_or_else(|| vortex_err!("Overflow computing sequence value"))?;
213            }
214        }
215
216        match selection.true_count() {
217            0 | N => out.set_selection(Selection::Prefix),
218            _ => out.set_selection(Selection::Mask),
219        }
220
221        Ok(())
222    }
223}