vortex_scan/range_scan.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
use std::future::Future;
use std::ops::{BitAnd, Range};
use std::sync::Arc;
use vortex_array::compute::fill_null;
use vortex_array::Array;
use vortex_error::{VortexExpect, VortexResult};
use vortex_expr::ExprRef;
use vortex_mask::Mask;
use crate::{RowMask, Scanner};
/// A scan operation defined for a single row-range of the columnar data.
pub struct RangeScanner {
scan: Arc<Scanner>,
row_range: Range<u64>,
mask: Mask,
state: State,
}
enum State {
// First we run the filter expression over the full row range.
FilterEval((Mask, Vec<ExprRef>)),
// Then we project the selected rows.
Project((Mask, ExprRef)),
// And then we're done.
Ready(Option<Array>),
}
/// The next operation that should be performed. Either an expression to run, or the result
/// of the [`RangeScanner`].
pub enum NextOp {
/// The finished result of the scan.
Ready(Option<Array>),
/// The next expression to evaluate.
/// The caller **must** first apply the mask before evaluating the expression.
Eval((Range<u64>, Mask, ExprRef)),
}
/// We implement the range scan via polling for the next operation to perform, and then posting
/// the result back to the range scan to make progress.
///
/// This allows us to minimize the amount of logic we duplicate in order to support both
/// synchronous and asynchronous evaluation.
///
/// ## A note on the API of evaluator functions
//
// We have chosen a general "run this expression" API instead of separate
// `filter(row_mask, expr) -> row_mask` + `project(row_mask, field_mask)` APIs. The reason for
// this is so we can eventually support cell-level push-down.
//
// If we only projected using a field mask, then it means we need to download all the data
// for the rows of field present in the row mask. When I say cell-level push-down, I mean
// we can slice the cell directly out of storage using an API like
// `SegmentReader::read(segment_id, byte_range: Range<usize>)`.
//
// Admittedly, this is a highly advanced use-case, but can prove invaluable for large cell values
// such as images and video.
//
// If instead we make the projection API `project(row_mask, expr)`, then the project API is
// identical to the filter API and there's no point having both. Hence, a single
// `evaluate(row_mask, expr)` API.
impl RangeScanner {
pub(crate) fn new(scan: Arc<Scanner>, row_offset: u64, mask: Mask) -> Self {
let state = if !scan.rev_filter.is_empty() {
// If we have a filter expression, then for now we evaluate it against all rows
// of the range.
// TODO(ngates): we should decide based on mask.true_count() whether to include the
// current mask or not. But note that the resulting expression would need to be
// aligned and intersected with the given mask.
State::FilterEval((Mask::new_true(mask.len()), scan.rev_filter.to_vec()))
} else {
// If there is no filter expression, then we immediately perform a mask + project.
State::Project((mask.clone(), scan.projection().clone()))
};
Self {
scan,
row_range: row_offset..row_offset + mask.len() as u64,
mask,
state,
}
}
/// Check for the next operation to perform.
/// Returns `Poll::Ready` when the scan is complete.
///
// FIXME(ngates): currently we have to clone the Mask to return it. Doing this
// forces the eager evaluation of the iterators.
fn next(&self) -> NextOp {
match &self.state {
State::FilterEval((mask, conjuncts)) => NextOp::Eval((
self.row_range.clone(),
mask.clone(),
conjuncts
.last()
.vortex_expect("conjuncts is always non-empty")
.clone(),
)),
State::Project((mask, expr)) => {
NextOp::Eval((self.row_range.clone(), mask.clone(), expr.clone()))
}
State::Ready(array) => NextOp::Ready(array.clone()),
}
}
/// Post the result of the last expression evaluation back to the range scan.
fn transition(mut self, result: Array) -> VortexResult<Self> {
const APPLY_FILTER_SELECTIVITY_THRESHOLD: f64 = 0.2;
match self.state {
State::FilterEval((eval_mask, mut conjuncts_rev)) => {
// conjuncts are non-empty here
conjuncts_rev.pop();
let result = fill_null(result, false.into())?;
// Intersect the result of the filter expression with our initial row mask.
let mask = Mask::try_from(result)?;
// We passed a full mask to the eval function so we must bit intersect instead
// of set-bit intersection if we massed a non-full mask to the evaluator.
let mask = if self.mask.len() == eval_mask.true_count() {
self.mask.bitand(&mask)
} else {
self.mask.intersect_by_rank(&mask)
};
// Then move onto the projection
if mask.true_count() == 0 {
// If the mask is empty, then we're done.
self.state = State::Ready(None);
} else if !conjuncts_rev.is_empty() {
self.mask = mask;
let mask = if self.mask.density() < APPLY_FILTER_SELECTIVITY_THRESHOLD {
self.mask.clone()
} else {
Mask::new_true(self.mask.len())
};
// conjuncts_rev is again non-empty, so we can put it into FilterEval
self.state = State::FilterEval((mask, conjuncts_rev))
} else {
self.state = State::Project((mask, self.scan.projection().clone()))
}
}
State::Project(_) => {
// We're done.
assert!(!result.is_empty(), "projected array cannot be empty");
self.state = State::Ready(Some(result));
}
State::Ready(_) => {}
}
Ok(self)
}
/// Evaluate the [`RangeScanner`] operation using a synchronous expression evaluator.
pub fn evaluate<E>(mut self, evaluator: E) -> VortexResult<Option<Array>>
where
E: Fn(RowMask, ExprRef) -> VortexResult<Array>,
{
loop {
match self.next() {
NextOp::Ready(array) => return Ok(array),
NextOp::Eval((row_range, mask, expr)) => {
self =
self.transition(evaluator(RowMask::new(mask, row_range.start), expr)?)?;
}
}
}
}
/// Evaluate the [`RangeScanner`] operation using an async expression evaluator.
pub async fn evaluate_async<E, F>(mut self, evaluator: E) -> VortexResult<Option<Array>>
where
E: Fn(RowMask, ExprRef) -> F,
F: Future<Output = VortexResult<Array>>,
{
loop {
match self.next() {
NextOp::Ready(array) => return Ok(array),
NextOp::Eval((row_range, mask, expr)) => {
self = self
.transition(evaluator(RowMask::new(mask, row_range.start), expr).await?)?;
}
}
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use vortex_array::array::{BoolArray, PrimitiveArray, StructArray};
use vortex_array::compute::filter;
use vortex_array::variants::StructArrayTrait;
use vortex_array::{IntoArray, IntoArrayVariant};
use vortex_dtype::Nullability::NonNullable;
use vortex_dtype::PType::U64;
use vortex_dtype::{DType, StructDType};
use vortex_expr::{and, get_item, gt, ident, lit};
use vortex_mask::Mask;
use crate::{RangeScanner, Scanner};
fn dtype() -> DType {
DType::Struct(
Arc::new(StructDType::new(
vec!["a".into(), "b".into(), "c".into()].into(),
vec![U64.into(), U64.into(), U64.into()],
)),
NonNullable,
)
}
#[test]
fn range_scan_few_conj_filter_low_selectivity() {
let expr_a = gt(get_item("a", ident()), lit(1));
let expr_b = gt(get_item("b", ident()), lit(2));
let expr_c = gt(get_item("c", ident()), lit(3));
let scan = Arc::new(
Scanner::new(
dtype(),
ident(),
Some(and(expr_a.clone(), and(expr_b.clone(), expr_c.clone()))),
)
.unwrap(),
);
let len = 1000;
let range = RangeScanner::new(scan, 0, Mask::new_true(len));
let res = range
.evaluate(|mask, expr| {
let arr = if expr == expr_a.clone() {
BoolArray::from_iter((0..mask.len()).map(|i| !(i > 10 && i < 30))).into_array()
} else if expr == expr_b.clone() {
BoolArray::from_iter((0..mask.len()).map(|i| !(i > 100 && i < 130)))
.into_array()
} else if expr == expr_c.clone() {
BoolArray::from_iter((0..mask.len()).map(|i| !(i > 510 && i < 530)))
.into_array()
} else if expr == ident() {
let arr = PrimitiveArray::from_iter(0..mask.len() as u64).into_array();
StructArray::from_fields(
[("a", arr.clone()), ("b", arr.clone()), ("c", arr)].as_slice(),
)
.unwrap()
.into_array()
} else {
unreachable!("unexpected expression {:?}", expr)
};
filter(&arr, mask.filter_mask())
})
.unwrap()
.unwrap();
assert!(res.as_struct_array().is_some());
let field = res.into_struct().unwrap().maybe_null_field_by_name("a");
assert_eq!(
field.unwrap().into_primitive().unwrap().as_slice::<u64>(),
(0..len as u64)
.filter(|&i| {
(i <= 10 || i >= 30) && (i <= 100 || i >= 130) && (i <= 510 || i >= 530)
})
.collect::<Vec<_>>()
.as_slice()
);
}
#[test]
fn range_scan_few_conj_filter_high_overlapping_selectivity() {
let expr_a = gt(get_item("a", ident()), lit(1));
let expr_b = gt(get_item("b", ident()), lit(2));
let expr_c = gt(get_item("c", ident()), lit(3));
let scan = Arc::new(
Scanner::new(
dtype(),
ident(),
Some(and(expr_a.clone(), and(expr_b.clone(), expr_c.clone()))),
)
.unwrap(),
);
let len = 1000;
let range = RangeScanner::new(scan, 0, Mask::new_true(len));
let res = range
.evaluate(|mask, expr| {
let arr = if expr == expr_a.clone() {
BoolArray::from_iter((0..mask.len()).map(|i| !(i > 10 && i < 900))).into_array()
} else if expr == expr_b.clone() {
BoolArray::from_iter((0..mask.len()).map(|i| !(i > 100 && i < 950)))
.into_array()
} else if expr == expr_c.clone() {
BoolArray::from_iter((0..mask.len()).map(|i| !(i > 210 && i < 990)))
.into_array()
} else if expr == ident() {
let arr = PrimitiveArray::from_iter(0..mask.len() as u64).into_array();
StructArray::from_fields(
[("a", arr.clone()), ("b", arr.clone()), ("c", arr)].as_slice(),
)
.unwrap()
.into_array()
} else {
unreachable!("unexpected expression {:?}", expr)
};
filter(&arr, mask.filter_mask())
})
.unwrap()
.unwrap();
assert!(res.as_struct_array().is_some());
let field = res.into_struct().unwrap().maybe_null_field_by_name("a");
assert_eq!(
field.unwrap().into_primitive().unwrap().as_slice::<u64>(),
(0..len as u64)
.filter(|&i| !(i > 10 && i < 990))
.collect::<Vec<_>>()
.as_slice()
);
}
}