vortex_scan/
range_scan.rs

1use std::future::Future;
2use std::ops::{BitAnd, Range};
3use std::sync::Arc;
4
5use vortex_array::compute::fill_null;
6use vortex_array::Array;
7use vortex_error::{VortexExpect, VortexResult};
8use vortex_expr::ExprRef;
9use vortex_mask::Mask;
10
11use crate::{RowMask, Scanner};
12
13/// A scan operation defined for a single row-range of the columnar data.
14pub struct RangeScanner {
15    scan: Arc<Scanner>,
16    row_range: Range<u64>,
17    mask: Mask,
18    state: State,
19}
20
21enum State {
22    // First we run the filter expression over the full row range.
23    FilterEval((Mask, Vec<ExprRef>)),
24    // Then we project the selected rows.
25    Project((Mask, ExprRef)),
26    // And then we're done.
27    Ready(Option<Array>),
28}
29
30/// The next operation that should be performed. Either an expression to run, or the result
31/// of the [`RangeScanner`].
32pub enum NextOp {
33    /// The finished result of the scan.
34    Ready(Option<Array>),
35    /// The next expression to evaluate.
36    /// The caller **must** first apply the mask before evaluating the expression.
37    Eval((Range<u64>, Mask, ExprRef)),
38}
39
40/// We implement the range scan via polling for the next operation to perform, and then posting
41/// the result back to the range scan to make progress.
42///
43/// This allows us to minimize the amount of logic we duplicate in order to support both
44/// synchronous and asynchronous evaluation.
45///
46/// ## A note on the API of evaluator functions
47//
48// We have chosen a general "run this expression" API instead of separate
49// `filter(row_mask, expr) -> row_mask` + `project(row_mask, field_mask)` APIs. The reason for
50// this is so we can eventually support cell-level push-down.
51//
52// If we only projected using a field mask, then it means we need to download all the data
53// for the rows of field present in the row mask. When I say cell-level push-down, I mean
54// we can slice the cell directly out of storage using an API like
55// `SegmentReader::read(segment_id, byte_range: Range<usize>)`.
56//
57// Admittedly, this is a highly advanced use-case, but can prove invaluable for large cell values
58// such as images and video.
59//
60// If instead we make the projection API `project(row_mask, expr)`, then the project API is
61// identical to the filter API and there's no point having both. Hence, a single
62// `evaluate(row_mask, expr)` API.
63impl RangeScanner {
64    pub(crate) fn new(scan: Arc<Scanner>, row_offset: u64, mask: Mask) -> Self {
65        let state = if !scan.rev_filter.is_empty() {
66            // If we have a filter expression, then for now we evaluate it against all rows
67            // of the range.
68            // TODO(ngates): we should decide based on mask.true_count() whether to include the
69            //  current mask or not. But note that the resulting expression would need to be
70            //  aligned and intersected with the given mask.
71            State::FilterEval((Mask::new_true(mask.len()), scan.rev_filter.to_vec()))
72        } else {
73            // If there is no filter expression, then we immediately perform a mask + project.
74            State::Project((mask.clone(), scan.projection().clone()))
75        };
76
77        Self {
78            scan,
79            row_range: row_offset..row_offset + mask.len() as u64,
80            mask,
81            state,
82        }
83    }
84
85    /// Check for the next operation to perform.
86    /// Returns `Poll::Ready` when the scan is complete.
87    ///
88    // FIXME(ngates): currently we have to clone the Mask to return it. Doing this
89    //  forces the eager evaluation of the iterators.
90    fn next(&self) -> NextOp {
91        match &self.state {
92            State::FilterEval((mask, conjuncts)) => NextOp::Eval((
93                self.row_range.clone(),
94                mask.clone(),
95                conjuncts
96                    .last()
97                    .vortex_expect("conjuncts is always non-empty")
98                    .clone(),
99            )),
100            State::Project((mask, expr)) => {
101                NextOp::Eval((self.row_range.clone(), mask.clone(), expr.clone()))
102            }
103            State::Ready(array) => NextOp::Ready(array.clone()),
104        }
105    }
106
107    /// Post the result of the last expression evaluation back to the range scan.
108    fn transition(mut self, result: Array) -> VortexResult<Self> {
109        const APPLY_FILTER_SELECTIVITY_THRESHOLD: f64 = 0.2;
110        match self.state {
111            State::FilterEval((eval_mask, mut conjuncts_rev)) => {
112                // conjuncts are non-empty here
113                conjuncts_rev.pop();
114
115                let result = fill_null(result, false.into())?;
116
117                // Intersect the result of the filter expression with our initial row mask.
118                let mask = Mask::try_from(result)?;
119
120                // We passed a full mask to the eval function so we must bit intersect instead
121                // of set-bit intersection if we massed a non-full mask to the evaluator.
122                let mask = if self.mask.len() == eval_mask.true_count() {
123                    self.mask.bitand(&mask)
124                } else {
125                    self.mask.intersect_by_rank(&mask)
126                };
127
128                // Then move onto the projection
129                if mask.true_count() == 0 {
130                    // If the mask is empty, then we're done.
131                    self.state = State::Ready(None);
132                } else if !conjuncts_rev.is_empty() {
133                    self.mask = mask;
134                    let mask = if self.mask.density() < APPLY_FILTER_SELECTIVITY_THRESHOLD {
135                        self.mask.clone()
136                    } else {
137                        Mask::new_true(self.mask.len())
138                    };
139                    // conjuncts_rev is again non-empty, so we can put it into FilterEval
140                    self.state = State::FilterEval((mask, conjuncts_rev))
141                } else {
142                    self.state = State::Project((mask, self.scan.projection().clone()))
143                }
144            }
145            State::Project(_) => {
146                // We're done.
147                assert!(!result.is_empty(), "projected array cannot be empty");
148                self.state = State::Ready(Some(result));
149            }
150            State::Ready(_) => {}
151        }
152        Ok(self)
153    }
154
155    /// Evaluate the [`RangeScanner`] operation using a synchronous expression evaluator.
156    pub fn evaluate<E>(mut self, evaluator: E) -> VortexResult<Option<Array>>
157    where
158        E: Fn(RowMask, ExprRef) -> VortexResult<Array>,
159    {
160        loop {
161            match self.next() {
162                NextOp::Ready(array) => return Ok(array),
163                NextOp::Eval((row_range, mask, expr)) => {
164                    self =
165                        self.transition(evaluator(RowMask::new(mask, row_range.start), expr)?)?;
166                }
167            }
168        }
169    }
170
171    /// Evaluate the [`RangeScanner`] operation using an async expression evaluator.
172    pub async fn evaluate_async<E, F>(mut self, evaluator: E) -> VortexResult<Option<Array>>
173    where
174        E: Fn(RowMask, ExprRef) -> F,
175        F: Future<Output = VortexResult<Array>>,
176    {
177        loop {
178            match self.next() {
179                NextOp::Ready(array) => return Ok(array),
180                NextOp::Eval((row_range, mask, expr)) => {
181                    self = self
182                        .transition(evaluator(RowMask::new(mask, row_range.start), expr).await?)?;
183                }
184            }
185        }
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use std::sync::Arc;
192
193    use vortex_array::array::{BoolArray, PrimitiveArray, StructArray};
194    use vortex_array::compute::filter;
195    use vortex_array::variants::StructArrayTrait;
196    use vortex_array::{IntoArray, IntoArrayVariant};
197    use vortex_dtype::Nullability::NonNullable;
198    use vortex_dtype::PType::U64;
199    use vortex_dtype::{DType, StructDType};
200    use vortex_expr::{and, get_item, gt, ident, lit};
201    use vortex_mask::Mask;
202
203    use crate::{RangeScanner, Scanner};
204
205    fn dtype() -> DType {
206        DType::Struct(
207            Arc::new(StructDType::new(
208                vec!["a".into(), "b".into(), "c".into()].into(),
209                vec![U64.into(), U64.into(), U64.into()],
210            )),
211            NonNullable,
212        )
213    }
214
215    #[test]
216    fn range_scan_few_conj_filter_low_selectivity() {
217        let expr_a = gt(get_item("a", ident()), lit(1));
218        let expr_b = gt(get_item("b", ident()), lit(2));
219        let expr_c = gt(get_item("c", ident()), lit(3));
220        let scan = Arc::new(
221            Scanner::new(
222                dtype(),
223                ident(),
224                Some(and(expr_a.clone(), and(expr_b.clone(), expr_c.clone()))),
225            )
226            .unwrap(),
227        );
228        let len = 1000;
229        let range = RangeScanner::new(scan, 0, Mask::new_true(len));
230
231        let res = range
232            .evaluate(|mask, expr| {
233                let arr = if expr == expr_a.clone() {
234                    BoolArray::from_iter((0..mask.len()).map(|i| !(i > 10 && i < 30))).into_array()
235                } else if expr == expr_b.clone() {
236                    BoolArray::from_iter((0..mask.len()).map(|i| !(i > 100 && i < 130)))
237                        .into_array()
238                } else if expr == expr_c.clone() {
239                    BoolArray::from_iter((0..mask.len()).map(|i| !(i > 510 && i < 530)))
240                        .into_array()
241                } else if expr == ident() {
242                    let arr = PrimitiveArray::from_iter(0..mask.len() as u64).into_array();
243                    StructArray::from_fields(
244                        [("a", arr.clone()), ("b", arr.clone()), ("c", arr)].as_slice(),
245                    )
246                    .unwrap()
247                    .into_array()
248                } else {
249                    unreachable!("unexpected expression {:?}", expr)
250                };
251
252                filter(&arr, mask.filter_mask())
253            })
254            .unwrap()
255            .unwrap();
256
257        assert!(res.as_struct_array().is_some());
258        let field = res.into_struct().unwrap().maybe_null_field_by_name("a");
259
260        assert_eq!(
261            field.unwrap().into_primitive().unwrap().as_slice::<u64>(),
262            (0..len as u64)
263                .filter(|&i| {
264                    (i <= 10 || i >= 30) && (i <= 100 || i >= 130) && (i <= 510 || i >= 530)
265                })
266                .collect::<Vec<_>>()
267                .as_slice()
268        );
269    }
270
271    #[test]
272    fn range_scan_few_conj_filter_high_overlapping_selectivity() {
273        let expr_a = gt(get_item("a", ident()), lit(1));
274        let expr_b = gt(get_item("b", ident()), lit(2));
275        let expr_c = gt(get_item("c", ident()), lit(3));
276        let scan = Arc::new(
277            Scanner::new(
278                dtype(),
279                ident(),
280                Some(and(expr_a.clone(), and(expr_b.clone(), expr_c.clone()))),
281            )
282            .unwrap(),
283        );
284        let len = 1000;
285        let range = RangeScanner::new(scan, 0, Mask::new_true(len));
286
287        let res = range
288            .evaluate(|mask, expr| {
289                let arr = if expr == expr_a.clone() {
290                    BoolArray::from_iter((0..mask.len()).map(|i| !(i > 10 && i < 900))).into_array()
291                } else if expr == expr_b.clone() {
292                    BoolArray::from_iter((0..mask.len()).map(|i| !(i > 100 && i < 950)))
293                        .into_array()
294                } else if expr == expr_c.clone() {
295                    BoolArray::from_iter((0..mask.len()).map(|i| !(i > 210 && i < 990)))
296                        .into_array()
297                } else if expr == ident() {
298                    let arr = PrimitiveArray::from_iter(0..mask.len() as u64).into_array();
299                    StructArray::from_fields(
300                        [("a", arr.clone()), ("b", arr.clone()), ("c", arr)].as_slice(),
301                    )
302                    .unwrap()
303                    .into_array()
304                } else {
305                    unreachable!("unexpected expression {:?}", expr)
306                };
307
308                filter(&arr, mask.filter_mask())
309            })
310            .unwrap()
311            .unwrap();
312
313        assert!(res.as_struct_array().is_some());
314
315        let field = res.into_struct().unwrap().maybe_null_field_by_name("a");
316
317        assert_eq!(
318            field.unwrap().into_primitive().unwrap().as_slice::<u64>(),
319            (0..len as u64)
320                .filter(|&i| !(i > 10 && i < 990))
321                .collect::<Vec<_>>()
322                .as_slice()
323        );
324    }
325}