vortex_sequence/
operator.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::hash::{Hash, Hasher};
6use std::sync::Arc;
7
8use num_traits::{ConstOne, PrimInt};
9use vortex_array::Array;
10use vortex_array::operator::slice::SliceOperator;
11use vortex_array::operator::{Operator, OperatorEq, OperatorHash, OperatorId, OperatorRef};
12use vortex_array::pipeline::view::ViewMut;
13use vortex_array::pipeline::{BindContext, Element, Kernel, KernelContext, N, PipelinedOperator};
14use vortex_array::vtable::PipelineVTable;
15use vortex_dtype::{DType, NativePType, match_each_integer_ptype};
16use vortex_error::{VortexResult, vortex_err};
17
18use crate::{SequenceArray, SequenceVTable};
19
20impl PipelineVTable<SequenceVTable> for SequenceVTable {
21    fn to_operator(array: &SequenceArray) -> VortexResult<Option<OperatorRef>> {
22        Ok(Some(Arc::new(array.clone())))
23    }
24}
25
26impl OperatorHash for SequenceArray {
27    fn operator_hash<H: Hasher>(&self, state: &mut H) {
28        self.base().hash(state);
29        self.multiplier().hash(state);
30        self.dtype().hash(state);
31        self.len().hash(state);
32    }
33}
34
35impl OperatorEq for SequenceArray {
36    fn operator_eq(&self, other: &Self) -> bool {
37        self.base() == other.base()
38            && self.multiplier() == other.multiplier()
39            && self.dtype() == other.dtype()
40            && self.len() == other.len()
41    }
42}
43
44impl Operator for SequenceArray {
45    fn id(&self) -> OperatorId {
46        self.encoding_id()
47    }
48
49    fn as_any(&self) -> &dyn Any {
50        self
51    }
52
53    fn dtype(&self) -> &DType {
54        Array::dtype(self.as_ref())
55    }
56
57    fn len(&self) -> usize {
58        Array::len(self.as_ref())
59    }
60
61    fn children(&self) -> &[OperatorRef] {
62        &[]
63    }
64
65    fn with_children(self: Arc<Self>, _children: Vec<OperatorRef>) -> VortexResult<OperatorRef> {
66        Ok(self)
67    }
68
69    fn reduce_parent(
70        &self,
71        parent: OperatorRef,
72        _child_idx: usize,
73    ) -> VortexResult<Option<OperatorRef>> {
74        // Push down slice
75        if let Some(slice) = parent.as_any().downcast_ref::<SliceOperator>() {
76            let range = slice.range();
77            return Ok(Some(Arc::new(SequenceArray::unchecked_new(
78                self.index_value(range.start),
79                self.multiplier(),
80                self.ptype(),
81                self.dtype().nullability(),
82                range.len(),
83            ))));
84        }
85
86        Ok(None)
87    }
88
89    fn as_pipelined(&self) -> Option<&dyn PipelinedOperator> {
90        Some(self)
91    }
92}
93
94impl PipelinedOperator for SequenceArray {
95    fn bind(&self, _ctx: &dyn BindContext) -> VortexResult<Box<dyn Kernel>> {
96        Ok(match_each_integer_ptype!(self.ptype(), |T| {
97            if self.multiplier().as_primitive::<T>() == <T as ConstOne>::ONE {
98                Box::new(SequenceKernel::<T> {
99                    base: self.base().as_primitive::<T>(),
100                    len: self.len(),
101                    offset: 0,
102                })
103            } else {
104                Box::new(MultiplierSequenceKernel::<T> {
105                    base: self.base().as_primitive::<T>(),
106                    multiplier: self.multiplier().as_primitive::<T>(),
107                    len: self.len(),
108                    offset: 0,
109                })
110            }
111        }))
112    }
113
114    fn vector_children(&self) -> Vec<usize> {
115        vec![]
116    }
117
118    fn batch_children(&self) -> Vec<usize> {
119        vec![]
120    }
121}
122
123struct SequenceKernel<T> {
124    base: T,
125    len: usize,
126    offset: usize,
127}
128
129impl<T: Element + NativePType + PrimInt> Kernel for SequenceKernel<T> {
130    fn step(&mut self, _ctx: &KernelContext, out: &mut ViewMut) -> VortexResult<()> {
131        // TODO(ngates): benchmark and optimize this
132        let values = out.as_slice_mut::<T>();
133        let len = (self.len - self.offset).min(N);
134        for i in 0..len {
135            values[i] = self.base
136                + T::from_usize(self.offset + i)
137                    .ok_or_else(|| vortex_err!("Overflow converting usize to ptype"))?;
138        }
139        out.set_len(len);
140        self.offset += len;
141        Ok(())
142    }
143}
144
145struct MultiplierSequenceKernel<T> {
146    base: T,
147    multiplier: T,
148    len: usize,
149    offset: usize,
150}
151
152impl<T: Element + NativePType + PrimInt> Kernel for MultiplierSequenceKernel<T> {
153    fn step(&mut self, _ctx: &KernelContext, out: &mut ViewMut) -> VortexResult<()> {
154        // TODO(ngates): benchmark and optimize this. We should use addition not multiplication
155        let values = out.as_slice_mut::<T>();
156        let len = (self.len - self.offset).min(N);
157        for i in 0..len {
158            values[i] = self.base
159                + self
160                    .multiplier
161                    .checked_mul(
162                        &T::from_usize(self.offset + i)
163                            .ok_or_else(|| vortex_err!("Overflow converting usize to ptype"))?,
164                    )
165                    .ok_or_else(|| vortex_err!("Overflow computing sequence value"))?;
166        }
167        out.set_len(len);
168        self.offset += len;
169        Ok(())
170    }
171}