use std::ops::BitAnd;
use std::ops::Range;
use std::sync::Arc;
use bit_vec::BitVec;
use futures::FutureExt;
use futures::future::BoxFuture;
use futures::future::ok;
use vortex_array::ArrayRef;
use vortex_array::MaskFuture;
use vortex_array::expr::Expression;
use vortex_error::VortexResult;
use vortex_mask::Mask;
use vortex_scan::selection::Selection;
use crate::LayoutReader;
use crate::scan::filter::FilterExpr;
pub type TaskFuture<A> = BoxFuture<'static, VortexResult<A>>;
pub fn split_exec<A: 'static + Send>(
ctx: Arc<TaskContext<A>>,
split: Range<u64>,
limit: Option<&mut u64>,
) -> VortexResult<TaskFuture<Option<A>>> {
let read_mask = ctx.selection.row_mask(&split);
let row_range = read_mask.row_range();
let row_mask = read_mask.mask().clone();
if row_mask.all_false() {
return Ok(ok(None).boxed());
}
let filter_mask = match ctx.filter.as_ref() {
None => {
let row_mask = match limit {
Some(l) if *l == 0 => Mask::new_false(row_mask.len()),
Some(l) => {
let true_count = row_mask.true_count();
let mask_limit = usize::try_from(*l)
.map(|l| l.min(true_count))
.unwrap_or(true_count);
let row_mask = row_mask.limit(mask_limit);
*l -= mask_limit as u64;
row_mask
}
None => row_mask,
};
MaskFuture::ready(row_mask)
}
Some(filter) => {
let reader = Arc::clone(&ctx.reader);
let filter = Arc::clone(filter);
let row_range = row_range.clone();
MaskFuture::new(row_mask.len(), async move {
let mut mask = row_mask;
let mut dynamic_versions = vec![None; filter.conjuncts().len()];
for (idx, conjunct) in filter.conjuncts().iter().enumerate() {
if mask.all_false() {
return Ok(mask);
}
dynamic_versions[idx] = filter.dynamic_updates(idx).map(|du| du.version());
let conjunct_mask = reader
.pruning_evaluation(&row_range, conjunct, mask.clone())?
.await?;
mask = mask.bitand(&conjunct_mask);
}
let mut remaining = BitVec::from_elem(filter.conjuncts().len(), true);
while let Some(idx) = filter.next_conjunct(&remaining) {
remaining.set(idx, false);
if mask.all_false() {
return Ok(mask);
}
let conjunct = &filter.conjuncts()[idx];
let current_version = filter.dynamic_updates(idx).map(|du| du.version());
if let Some(dv) = current_version
&& dynamic_versions[idx].is_none_or(|v| v < dv)
{
dynamic_versions[idx] = Some(dv);
let conjunct_mask = reader
.pruning_evaluation(&row_range, conjunct, mask.clone())?
.await?;
mask = mask.bitand(&conjunct_mask);
}
if mask.all_false() {
return Ok(mask);
}
let conjunct_mask = reader
.filter_evaluation(&row_range, conjunct, MaskFuture::ready(mask))?
.await?;
filter.report_selectivity(idx, conjunct_mask.density());
mask = conjunct_mask;
}
Ok(mask)
})
}
};
let projection_future =
ctx.reader
.projection_evaluation(&row_range, &ctx.projection, filter_mask.clone())?;
let mapper = Arc::clone(&ctx.mapper);
let array_fut = async move {
let mask = filter_mask.await?;
if mask.all_false() {
return Ok(None);
}
let array = projection_future.await?;
mapper(array).map(Some)
};
Ok(array_fut.boxed())
}
pub struct TaskContext<A> {
pub selection: Selection,
pub filter: Option<Arc<FilterExpr>>,
pub reader: Arc<dyn LayoutReader>,
pub projection: Expression,
pub mapper: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
}