vortex_scan/lib.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
//! The `vortex-scan` crate provides utilities for performing efficient scan operations.
//!
//! The [`Scanner`] object is responsible for storing state related to a scan operation, including
//! expression selectivity metrics, in order to continually optimize the execution plan for each
//! row-range of the scan.
#![deny(missing_docs)]
mod range_scan;
mod row_mask;
use std::sync::Arc;
pub use range_scan::*;
pub use row_mask::*;
use vortex_dtype::DType;
use vortex_error::VortexResult;
use vortex_expr::forms::cnf::cnf;
use vortex_expr::transform::simplify_typed::simplify_typed;
use vortex_expr::{lit, or, ExprRef};
/// Represents a scan operation to read data from a set of rows, with an optional filter expression,
/// and a projection expression.
///
/// A scan operation can be broken into many [`RangeScanner`] operations, each of which leverages
/// shared statistics from the parent [`Scanner`] to optimize the order in which filter and projection
/// operations are applied.
///
/// For example, if a filter expression has a top-level `AND` clause, it may be the case that one
/// clause is significantly more selective than the other. In this case, we may want to compute the
/// most selective filter first, then prune rows using result of the filter, before evaluating
/// the second filter over the reduced set of rows.
#[derive(Debug, Clone)]
pub struct Scanner {
projection: ExprRef,
rev_filter: Box<[ExprRef]>,
projection_dtype: DType,
// A sorted list of row indices to include in the scan. We store row indices since they may
// produce a very sparse RowMask.
// take_indices: Vec<u64>,
// statistics: RwLock<Statistics>
}
impl Scanner {
/// Create a new scan with the given projection and optional filter.
pub fn new(dtype: DType, projection: ExprRef, filter: Option<ExprRef>) -> VortexResult<Self> {
let projection = simplify_typed(projection, &dtype)?;
let filter = filter.map(|f| simplify_typed(f, &dtype)).transpose()?;
// TODO(ngates): compute and cache a FieldMask based on the referenced fields.
// Where FieldMask ~= Vec<FieldPath>
let result_dtype = projection.return_dtype(&dtype)?;
let conjuncts: Box<[ExprRef]> = if let Some(filter) = filter {
let conjuncts = cnf(filter)?;
conjuncts
.into_iter()
.map(|disjunction| {
disjunction
.into_iter()
.reduce(or)
.unwrap_or_else(|| lit(false))
})
// Reverse the conjuncts so we can pop over the final value each time without a shuffle
.rev()
.collect()
} else {
Box::new([])
};
Ok(Self {
projection,
rev_filter: conjuncts,
projection_dtype: result_dtype,
})
}
/// Returns the projection expression.
pub fn projection(&self) -> &ExprRef {
&self.projection
}
/// Compute the result dtype of the scan given the input dtype.
pub fn result_dtype(&self) -> &DType {
&self.projection_dtype
}
/// Instantiate a new scan for a specific range. The range scan will share statistics with this
/// parent scan in order to optimize future range scans.
pub fn range_scanner(self: Arc<Self>, row_mask: RowMask) -> VortexResult<RangeScanner> {
Ok(RangeScanner::new(
self,
row_mask.begin(),
row_mask.filter_mask().clone(),
))
}
}