vortex_sequence/
operator.rs1use std::any::Any;
5use std::hash::{Hash, Hasher};
6use std::sync::Arc;
7
8use num_traits::{ConstOne, PrimInt};
9use vortex_array::Array;
10use vortex_array::operator::slice::SliceOperator;
11use vortex_array::operator::{Operator, OperatorEq, OperatorHash, OperatorId, OperatorRef};
12use vortex_array::pipeline::view::ViewMut;
13use vortex_array::pipeline::{BindContext, Element, Kernel, KernelContext, N, PipelinedOperator};
14use vortex_array::vtable::PipelineVTable;
15use vortex_dtype::{DType, NativePType, match_each_integer_ptype};
16use vortex_error::{VortexResult, vortex_err};
17
18use crate::{SequenceArray, SequenceVTable};
19
20impl PipelineVTable<SequenceVTable> for SequenceVTable {
21 fn to_operator(array: &SequenceArray) -> VortexResult<Option<OperatorRef>> {
22 Ok(Some(Arc::new(array.clone())))
23 }
24}
25
26impl OperatorHash for SequenceArray {
27 fn operator_hash<H: Hasher>(&self, state: &mut H) {
28 self.base().hash(state);
29 self.multiplier().hash(state);
30 self.dtype().hash(state);
31 self.len().hash(state);
32 }
33}
34
35impl OperatorEq for SequenceArray {
36 fn operator_eq(&self, other: &Self) -> bool {
37 self.base() == other.base()
38 && self.multiplier() == other.multiplier()
39 && self.dtype() == other.dtype()
40 && self.len() == other.len()
41 }
42}
43
44impl Operator for SequenceArray {
45 fn id(&self) -> OperatorId {
46 self.encoding_id()
47 }
48
49 fn as_any(&self) -> &dyn Any {
50 self
51 }
52
53 fn dtype(&self) -> &DType {
54 Array::dtype(self.as_ref())
55 }
56
57 fn len(&self) -> usize {
58 Array::len(self.as_ref())
59 }
60
61 fn children(&self) -> &[OperatorRef] {
62 &[]
63 }
64
65 fn with_children(self: Arc<Self>, _children: Vec<OperatorRef>) -> VortexResult<OperatorRef> {
66 Ok(self)
67 }
68
69 fn reduce_parent(
70 &self,
71 parent: OperatorRef,
72 _child_idx: usize,
73 ) -> VortexResult<Option<OperatorRef>> {
74 if let Some(slice) = parent.as_any().downcast_ref::<SliceOperator>() {
76 let range = slice.range();
77 return Ok(Some(Arc::new(SequenceArray::unchecked_new(
78 self.index_value(range.start),
79 self.multiplier(),
80 self.ptype(),
81 self.dtype().nullability(),
82 range.len(),
83 ))));
84 }
85
86 Ok(None)
87 }
88
89 fn as_pipelined(&self) -> Option<&dyn PipelinedOperator> {
90 Some(self)
91 }
92}
93
94impl PipelinedOperator for SequenceArray {
95 fn bind(&self, _ctx: &dyn BindContext) -> VortexResult<Box<dyn Kernel>> {
96 Ok(match_each_integer_ptype!(self.ptype(), |T| {
97 if self.multiplier().as_primitive::<T>() == <T as ConstOne>::ONE {
98 Box::new(SequenceKernel::<T> {
99 base: self.base().as_primitive::<T>(),
100 len: self.len(),
101 offset: 0,
102 })
103 } else {
104 Box::new(MultiplierSequenceKernel::<T> {
105 base: self.base().as_primitive::<T>(),
106 multiplier: self.multiplier().as_primitive::<T>(),
107 len: self.len(),
108 offset: 0,
109 })
110 }
111 }))
112 }
113
114 fn vector_children(&self) -> Vec<usize> {
115 vec![]
116 }
117
118 fn batch_children(&self) -> Vec<usize> {
119 vec![]
120 }
121}
122
123struct SequenceKernel<T> {
124 base: T,
125 len: usize,
126 offset: usize,
127}
128
129impl<T: Element + NativePType + PrimInt> Kernel for SequenceKernel<T> {
130 fn step(&mut self, _ctx: &KernelContext, out: &mut ViewMut) -> VortexResult<()> {
131 let values = out.as_slice_mut::<T>();
133 let len = (self.len - self.offset).min(N);
134 for i in 0..len {
135 values[i] = self.base
136 + T::from_usize(self.offset + i)
137 .ok_or_else(|| vortex_err!("Overflow converting usize to ptype"))?;
138 }
139 out.set_len(len);
140 self.offset += len;
141 Ok(())
142 }
143}
144
145struct MultiplierSequenceKernel<T> {
146 base: T,
147 multiplier: T,
148 len: usize,
149 offset: usize,
150}
151
152impl<T: Element + NativePType + PrimInt> Kernel for MultiplierSequenceKernel<T> {
153 fn step(&mut self, _ctx: &KernelContext, out: &mut ViewMut) -> VortexResult<()> {
154 let values = out.as_slice_mut::<T>();
156 let len = (self.len - self.offset).min(N);
157 for i in 0..len {
158 values[i] = self.base
159 + self
160 .multiplier
161 .checked_mul(
162 &T::from_usize(self.offset + i)
163 .ok_or_else(|| vortex_err!("Overflow converting usize to ptype"))?,
164 )
165 .ok_or_else(|| vortex_err!("Overflow computing sequence value"))?;
166 }
167 out.set_len(len);
168 self.offset += len;
169 Ok(())
170 }
171}