1use std::future::Future;
2use std::ops::{BitAnd, Range};
3use std::sync::Arc;
4
5use vortex_array::compute::fill_null;
6use vortex_array::Array;
7use vortex_error::{VortexExpect, VortexResult};
8use vortex_expr::ExprRef;
9use vortex_mask::Mask;
10
11use crate::{RowMask, Scanner};
12
13pub struct RangeScanner {
15 scan: Arc<Scanner>,
16 row_range: Range<u64>,
17 mask: Mask,
18 state: State,
19}
20
21enum State {
22 FilterEval((Mask, Vec<ExprRef>)),
24 Project((Mask, ExprRef)),
26 Ready(Option<Array>),
28}
29
30pub enum NextOp {
33 Ready(Option<Array>),
35 Eval((Range<u64>, Mask, ExprRef)),
38}
39
40impl RangeScanner {
64 pub(crate) fn new(scan: Arc<Scanner>, row_offset: u64, mask: Mask) -> Self {
65 let state = if !scan.rev_filter.is_empty() {
66 State::FilterEval((Mask::new_true(mask.len()), scan.rev_filter.to_vec()))
72 } else {
73 State::Project((mask.clone(), scan.projection().clone()))
75 };
76
77 Self {
78 scan,
79 row_range: row_offset..row_offset + mask.len() as u64,
80 mask,
81 state,
82 }
83 }
84
85 fn next(&self) -> NextOp {
91 match &self.state {
92 State::FilterEval((mask, conjuncts)) => NextOp::Eval((
93 self.row_range.clone(),
94 mask.clone(),
95 conjuncts
96 .last()
97 .vortex_expect("conjuncts is always non-empty")
98 .clone(),
99 )),
100 State::Project((mask, expr)) => {
101 NextOp::Eval((self.row_range.clone(), mask.clone(), expr.clone()))
102 }
103 State::Ready(array) => NextOp::Ready(array.clone()),
104 }
105 }
106
107 fn transition(mut self, result: Array) -> VortexResult<Self> {
109 const APPLY_FILTER_SELECTIVITY_THRESHOLD: f64 = 0.2;
110 match self.state {
111 State::FilterEval((eval_mask, mut conjuncts_rev)) => {
112 conjuncts_rev.pop();
114
115 let result = fill_null(result, false.into())?;
116
117 let mask = Mask::try_from(result)?;
119
120 let mask = if self.mask.len() == eval_mask.true_count() {
123 self.mask.bitand(&mask)
124 } else {
125 self.mask.intersect_by_rank(&mask)
126 };
127
128 if mask.true_count() == 0 {
130 self.state = State::Ready(None);
132 } else if !conjuncts_rev.is_empty() {
133 self.mask = mask;
134 let mask = if self.mask.density() < APPLY_FILTER_SELECTIVITY_THRESHOLD {
135 self.mask.clone()
136 } else {
137 Mask::new_true(self.mask.len())
138 };
139 self.state = State::FilterEval((mask, conjuncts_rev))
141 } else {
142 self.state = State::Project((mask, self.scan.projection().clone()))
143 }
144 }
145 State::Project(_) => {
146 assert!(!result.is_empty(), "projected array cannot be empty");
148 self.state = State::Ready(Some(result));
149 }
150 State::Ready(_) => {}
151 }
152 Ok(self)
153 }
154
155 pub fn evaluate<E>(mut self, evaluator: E) -> VortexResult<Option<Array>>
157 where
158 E: Fn(RowMask, ExprRef) -> VortexResult<Array>,
159 {
160 loop {
161 match self.next() {
162 NextOp::Ready(array) => return Ok(array),
163 NextOp::Eval((row_range, mask, expr)) => {
164 self =
165 self.transition(evaluator(RowMask::new(mask, row_range.start), expr)?)?;
166 }
167 }
168 }
169 }
170
171 pub async fn evaluate_async<E, F>(mut self, evaluator: E) -> VortexResult<Option<Array>>
173 where
174 E: Fn(RowMask, ExprRef) -> F,
175 F: Future<Output = VortexResult<Array>>,
176 {
177 loop {
178 match self.next() {
179 NextOp::Ready(array) => return Ok(array),
180 NextOp::Eval((row_range, mask, expr)) => {
181 self = self
182 .transition(evaluator(RowMask::new(mask, row_range.start), expr).await?)?;
183 }
184 }
185 }
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use std::sync::Arc;
192
193 use vortex_array::array::{BoolArray, PrimitiveArray, StructArray};
194 use vortex_array::compute::filter;
195 use vortex_array::variants::StructArrayTrait;
196 use vortex_array::{IntoArray, IntoArrayVariant};
197 use vortex_dtype::Nullability::NonNullable;
198 use vortex_dtype::PType::U64;
199 use vortex_dtype::{DType, StructDType};
200 use vortex_expr::{and, get_item, gt, ident, lit};
201 use vortex_mask::Mask;
202
203 use crate::{RangeScanner, Scanner};
204
205 fn dtype() -> DType {
206 DType::Struct(
207 Arc::new(StructDType::new(
208 vec!["a".into(), "b".into(), "c".into()].into(),
209 vec![U64.into(), U64.into(), U64.into()],
210 )),
211 NonNullable,
212 )
213 }
214
215 #[test]
216 fn range_scan_few_conj_filter_low_selectivity() {
217 let expr_a = gt(get_item("a", ident()), lit(1));
218 let expr_b = gt(get_item("b", ident()), lit(2));
219 let expr_c = gt(get_item("c", ident()), lit(3));
220 let scan = Arc::new(
221 Scanner::new(
222 dtype(),
223 ident(),
224 Some(and(expr_a.clone(), and(expr_b.clone(), expr_c.clone()))),
225 )
226 .unwrap(),
227 );
228 let len = 1000;
229 let range = RangeScanner::new(scan, 0, Mask::new_true(len));
230
231 let res = range
232 .evaluate(|mask, expr| {
233 let arr = if expr == expr_a.clone() {
234 BoolArray::from_iter((0..mask.len()).map(|i| !(i > 10 && i < 30))).into_array()
235 } else if expr == expr_b.clone() {
236 BoolArray::from_iter((0..mask.len()).map(|i| !(i > 100 && i < 130)))
237 .into_array()
238 } else if expr == expr_c.clone() {
239 BoolArray::from_iter((0..mask.len()).map(|i| !(i > 510 && i < 530)))
240 .into_array()
241 } else if expr == ident() {
242 let arr = PrimitiveArray::from_iter(0..mask.len() as u64).into_array();
243 StructArray::from_fields(
244 [("a", arr.clone()), ("b", arr.clone()), ("c", arr)].as_slice(),
245 )
246 .unwrap()
247 .into_array()
248 } else {
249 unreachable!("unexpected expression {:?}", expr)
250 };
251
252 filter(&arr, mask.filter_mask())
253 })
254 .unwrap()
255 .unwrap();
256
257 assert!(res.as_struct_array().is_some());
258 let field = res.into_struct().unwrap().maybe_null_field_by_name("a");
259
260 assert_eq!(
261 field.unwrap().into_primitive().unwrap().as_slice::<u64>(),
262 (0..len as u64)
263 .filter(|&i| {
264 (i <= 10 || i >= 30) && (i <= 100 || i >= 130) && (i <= 510 || i >= 530)
265 })
266 .collect::<Vec<_>>()
267 .as_slice()
268 );
269 }
270
271 #[test]
272 fn range_scan_few_conj_filter_high_overlapping_selectivity() {
273 let expr_a = gt(get_item("a", ident()), lit(1));
274 let expr_b = gt(get_item("b", ident()), lit(2));
275 let expr_c = gt(get_item("c", ident()), lit(3));
276 let scan = Arc::new(
277 Scanner::new(
278 dtype(),
279 ident(),
280 Some(and(expr_a.clone(), and(expr_b.clone(), expr_c.clone()))),
281 )
282 .unwrap(),
283 );
284 let len = 1000;
285 let range = RangeScanner::new(scan, 0, Mask::new_true(len));
286
287 let res = range
288 .evaluate(|mask, expr| {
289 let arr = if expr == expr_a.clone() {
290 BoolArray::from_iter((0..mask.len()).map(|i| !(i > 10 && i < 900))).into_array()
291 } else if expr == expr_b.clone() {
292 BoolArray::from_iter((0..mask.len()).map(|i| !(i > 100 && i < 950)))
293 .into_array()
294 } else if expr == expr_c.clone() {
295 BoolArray::from_iter((0..mask.len()).map(|i| !(i > 210 && i < 990)))
296 .into_array()
297 } else if expr == ident() {
298 let arr = PrimitiveArray::from_iter(0..mask.len() as u64).into_array();
299 StructArray::from_fields(
300 [("a", arr.clone()), ("b", arr.clone()), ("c", arr)].as_slice(),
301 )
302 .unwrap()
303 .into_array()
304 } else {
305 unreachable!("unexpected expression {:?}", expr)
306 };
307
308 filter(&arr, mask.filter_mask())
309 })
310 .unwrap()
311 .unwrap();
312
313 assert!(res.as_struct_array().is_some());
314
315 let field = res.into_struct().unwrap().maybe_null_field_by_name("a");
316
317 assert_eq!(
318 field.unwrap().into_primitive().unwrap().as_slice::<u64>(),
319 (0..len as u64)
320 .filter(|&i| !(i > 10 && i < 990))
321 .collect::<Vec<_>>()
322 .as_slice()
323 );
324 }
325}