use std::iter;
use bit_vec::BitVec;
use itertools::Itertools;
use parking_lot::RwLock;
use sketches_ddsketch::DDSketch;
use vortex_array::expr::Expression;
use vortex_array::expr::forms::conjuncts;
use vortex_array::scalar_fn::fns::dynamic::DynamicExprUpdates;
use vortex_error::VortexExpect;
use vortex_error::vortex_err;
use vortex_error::vortex_panic;
const DEFAULT_SELECTIVITY_QUANTILE: f64 = 0.1;
pub struct FilterExpr {
conjuncts: Vec<Expression>,
conjunct_selectivity: Vec<RwLock<DDSketch>>,
dynamic_conjuncts: Vec<Option<DynamicExprUpdates>>,
ordering: RwLock<Vec<usize>>,
selectivity_quantile: f64,
}
impl FilterExpr {
pub fn new(expr: Expression) -> Self {
let conjuncts = conjuncts(&expr);
let num_conjuncts = conjuncts.len();
let dynamic_conjuncts = conjuncts.iter().map(DynamicExprUpdates::new).collect_vec();
Self {
conjuncts,
conjunct_selectivity: iter::repeat_with(|| RwLock::new(DDSketch::default()))
.take(num_conjuncts)
.collect(),
dynamic_conjuncts,
ordering: RwLock::new((0..num_conjuncts).collect()),
selectivity_quantile: DEFAULT_SELECTIVITY_QUANTILE,
}
}
#[inline]
pub fn conjuncts(&self) -> &[Expression] {
&self.conjuncts
}
#[inline]
pub fn dynamic_updates(&self, conjunct_idx: usize) -> Option<&DynamicExprUpdates> {
self.dynamic_conjuncts[conjunct_idx].as_ref()
}
#[inline]
pub fn next_conjunct(&self, remaining: &BitVec) -> Option<usize> {
let read = self.ordering.read();
read.iter().find(|&idx| remaining[*idx]).copied()
}
pub fn report_selectivity(&self, conjunct_idx: usize, selectivity: f64) {
if !(0.0..=1.0).contains(&selectivity) {
vortex_panic!(
"selectivity {} must be in the range [0.0, 1.0]",
selectivity
);
}
{
let mut histogram = self.conjunct_selectivity[conjunct_idx].write();
histogram.add(selectivity);
}
let all_selectivity = self
.conjunct_selectivity
.iter()
.map(|histogram| {
histogram
.read()
.quantile(self.selectivity_quantile)
.map_err(|e| vortex_err!("{e}")) .vortex_expect("quantile out of range")
.unwrap_or_default()
})
.collect::<Vec<_>>();
{
let ordering = self.ordering.read();
if ordering.is_sorted_by_key(|&idx| all_selectivity[idx]) {
return;
}
}
let mut ordering = self.ordering.write();
ordering.sort_unstable_by(|&l_idx, &r_idx| {
all_selectivity[l_idx]
.partial_cmp(&all_selectivity[r_idx])
.vortex_expect("Can't compare selectivity values")
});
tracing::debug!(
"Reordered conjuncts based on new selectivity {:?}",
ordering
.iter()
.map(|&idx| format!("({}) => {}", self.conjuncts[idx], all_selectivity[idx]))
.join(", ")
);
}
}