use std::sync::{Arc, LazyLock};
use super::utils::{IndexMetrics, InstrumentedRecordBatchStreamAdapter};
use crate::{
Dataset,
dataset::rowids::load_row_id_sequences,
index::{DatasetIndexInternalExt, prefilter::DatasetPreFilter},
};
use arrow_array::{Array, RecordBatch, UInt64Array};
use arrow_schema::{Schema, SchemaRef};
use async_recursion::async_recursion;
use async_trait::async_trait;
use datafusion::{
common::{Statistics, stats::Precision},
physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
execution_plan::{Boundedness, EmissionType},
metrics::{ExecutionPlanMetricsSet, MetricsSet},
stream::RecordBatchStreamAdapter,
},
scalar::ScalarValue,
};
use datafusion_physical_expr::EquivalenceProperties;
use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt, stream::BoxStream};
use lance_core::utils::mask::RowSetOps;
use lance_core::{
Error, ROW_ID_FIELD, Result,
utils::{
address::RowAddress,
mask::{RowAddrMask, RowAddrTreeMap},
},
};
use lance_datafusion::{
chunker::break_stream,
utils::{
ExecutionPlanMetricsSetExt, SCALAR_INDEX_SEARCH_TIME_METRIC, SCALAR_INDEX_SER_TIME_METRIC,
},
};
use lance_index::{
DatasetIndexExt, IndexCriteria,
metrics::MetricsCollector,
scalar::{
SargableQuery, ScalarIndex,
expression::{
INDEX_EXPR_RESULT_SCHEMA, IndexExprResult, ScalarIndexExpr, ScalarIndexLoader,
ScalarIndexSearch,
},
},
};
use lance_table::format::Fragment;
use roaring::RoaringBitmap;
use tracing::{debug_span, instrument};
#[async_trait]
impl ScalarIndexLoader for Dataset {
async fn load_index(
&self,
column: &str,
index_name: &str,
metrics: &dyn MetricsCollector,
) -> Result<Arc<dyn ScalarIndex>> {
let idx = self
.load_scalar_index(IndexCriteria::default().with_name(index_name))
.await?
.ok_or_else(|| Error::internal(format!("Scanner created plan for index query on index {} for column {} but no usable index exists with that name", index_name, column)))?;
self.open_scalar_index(column, &idx.uuid.to_string(), metrics)
.await
}
}
#[derive(Debug)]
pub struct ScalarIndexExec {
dataset: Arc<Dataset>,
expr: ScalarIndexExpr,
properties: PlanProperties,
metrics: ExecutionPlanMetricsSet,
}
impl DisplayAs for ScalarIndexExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "ScalarIndexQuery: query={}", self.expr)
}
DisplayFormatType::TreeRender => {
write!(f, "ScalarIndexQuery\nquery={}", self.expr)
}
}
}
}
impl ScalarIndexExec {
pub fn new(dataset: Arc<Dataset>, expr: ScalarIndexExpr) -> Self {
let properties = PlanProperties::new(
EquivalenceProperties::new(INDEX_EXPR_RESULT_SCHEMA.clone()),
Partitioning::RoundRobinBatch(1),
EmissionType::Incremental,
Boundedness::Bounded,
);
Self {
dataset,
expr,
properties,
metrics: ExecutionPlanMetricsSet::new(),
}
}
#[async_recursion]
async fn fragments_covered_by_index_query(
index_expr: &ScalarIndexExpr,
dataset: &Dataset,
) -> Result<RoaringBitmap> {
match index_expr {
ScalarIndexExpr::And(lhs, rhs) => {
Ok(Self::fragments_covered_by_index_query(lhs, dataset).await?
& Self::fragments_covered_by_index_query(rhs, dataset).await?)
}
ScalarIndexExpr::Or(lhs, rhs) => {
Ok(Self::fragments_covered_by_index_query(lhs, dataset).await?
& Self::fragments_covered_by_index_query(rhs, dataset).await?)
}
ScalarIndexExpr::Not(expr) => {
Self::fragments_covered_by_index_query(expr, dataset).await
}
ScalarIndexExpr::Query(search_key) => {
let idx = dataset
.load_scalar_index(IndexCriteria::default().with_name(&search_key.index_name))
.await?
.expect("Index not found even though it must have been found earlier");
Ok(idx
.fragment_bitmap
.expect("scalar indices should always have a fragment bitmap"))
}
}
}
async fn do_execute(
expr: ScalarIndexExpr,
dataset: Arc<Dataset>,
plan_metrics: ExecutionPlanMetricsSet,
) -> Result<RecordBatch> {
let metrics = IndexMetrics::new(&plan_metrics, 0);
let query_result = {
let search_time = plan_metrics.new_time(SCALAR_INDEX_SEARCH_TIME_METRIC, 0);
let _timer = search_time.timer();
expr.evaluate(dataset.as_ref(), &metrics).await?
};
let fragments_covered_by_result =
Self::fragments_covered_by_index_query(&expr, dataset.as_ref()).await?;
{
let ser_time = plan_metrics.new_time(SCALAR_INDEX_SER_TIME_METRIC, 0);
let _timer = ser_time.timer();
query_result.serialize_to_arrow(&fragments_covered_by_result)
}
}
}
impl ExecutionPlan for ScalarIndexExec {
fn name(&self) -> &str {
"ScalarIndexExec"
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> SchemaRef {
INDEX_EXPR_RESULT_SCHEMA.clone()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
if !children.is_empty() {
Err(datafusion::error::DataFusionError::Internal(
"ScalarIndexExec does not have children".to_string(),
))
} else {
Ok(self)
}
}
fn execute(
&self,
partition: usize,
_context: Arc<datafusion::execution::context::TaskContext>,
) -> datafusion::error::Result<datafusion::physical_plan::SendableRecordBatchStream> {
let batch_fut = Self::do_execute(
self.expr.clone(),
self.dataset.clone(),
self.metrics.clone(),
);
let stream = futures::stream::iter(vec![batch_fut])
.then(|batch_fut| batch_fut.map_err(|err| err.into()))
.boxed()
as BoxStream<'static, datafusion::common::Result<RecordBatch>>;
Ok(Box::pin(InstrumentedRecordBatchStreamAdapter::new(
INDEX_EXPR_RESULT_SCHEMA.clone(),
stream,
partition,
&self.metrics,
)))
}
fn statistics(&self) -> datafusion::error::Result<datafusion::physical_plan::Statistics> {
Ok(Statistics {
num_rows: Precision::Exact(2),
..Statistics::new_unknown(&INDEX_EXPR_RESULT_SCHEMA)
})
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn supports_limit_pushdown(&self) -> bool {
false
}
}
pub static INDEX_LOOKUP_SCHEMA: LazyLock<SchemaRef> =
LazyLock::new(|| Arc::new(Schema::new(vec![ROW_ID_FIELD.clone()])));
#[derive(Debug)]
pub struct MapIndexExec {
dataset: Arc<Dataset>,
column_name: String,
index_name: String,
input: Arc<dyn ExecutionPlan>,
properties: PlanProperties,
metrics: ExecutionPlanMetricsSet,
}
impl DisplayAs for MapIndexExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default
| DisplayFormatType::Verbose
| DisplayFormatType::TreeRender => {
write!(f, "IndexedLookup")
}
}
}
}
impl MapIndexExec {
pub fn new(
dataset: Arc<Dataset>,
column_name: String,
index_name: String,
input: Arc<dyn ExecutionPlan>,
) -> Self {
let properties = PlanProperties::new(
EquivalenceProperties::new(INDEX_LOOKUP_SCHEMA.clone()),
Partitioning::RoundRobinBatch(1),
EmissionType::Incremental,
Boundedness::Bounded,
);
Self {
dataset,
column_name,
index_name,
input,
properties,
metrics: ExecutionPlanMetricsSet::new(),
}
}
async fn map_batch(
column_name: String,
index_name: String,
dataset: Arc<Dataset>,
deletion_mask: Option<Arc<RowAddrMask>>,
batch: RecordBatch,
metrics: Arc<IndexMetrics>,
) -> datafusion::error::Result<RecordBatch> {
let index_vals = batch.column(0);
let index_vals = (0..index_vals.len())
.map(|idx| ScalarValue::try_from_array(index_vals, idx))
.collect::<datafusion::error::Result<Vec<_>>>()?;
let query = ScalarIndexExpr::Query(ScalarIndexSearch {
column: column_name,
index_name,
query: Arc::new(SargableQuery::IsIn(index_vals)),
needs_recheck: false,
});
let query_result = query.evaluate(dataset.as_ref(), metrics.as_ref()).await?;
let IndexExprResult::Exact(mut row_addr_mask) = query_result else {
todo!("Support for non-exact query results as input for merge_insert")
};
if let Some(deletion_mask) = deletion_mask.as_ref() {
row_addr_mask = row_addr_mask & deletion_mask.as_ref().clone();
}
let row_id_iter = row_addr_mask
.iter_addrs()
.ok_or(datafusion::error::DataFusionError::Internal(
"IndexedLookupExec: Cannot iterate over row addresses (BlockList or contains full fragments)".to_string(),
))?;
let allow_list: UInt64Array = row_id_iter.map(u64::from).collect();
Ok(RecordBatch::try_new(
INDEX_LOOKUP_SCHEMA.clone(),
vec![Arc::new(allow_list)],
)?)
}
async fn do_execute(
input: datafusion::physical_plan::SendableRecordBatchStream,
dataset: Arc<Dataset>,
column_name: String,
index_name: String,
metrics: Arc<IndexMetrics>,
) -> datafusion::error::Result<
impl Stream<Item = datafusion::error::Result<RecordBatch>> + Send + 'static,
> {
let index = dataset
.load_scalar_index(IndexCriteria::default().with_name(&index_name))
.await?
.unwrap();
let deletion_mask_fut =
DatasetPreFilter::create_deletion_mask(dataset.clone(), index.fragment_bitmap.unwrap());
let deletion_mask = if let Some(deletion_mask_fut) = deletion_mask_fut {
Some(deletion_mask_fut.await?)
} else {
None
};
Ok(input.and_then(move |res| {
let column_name = column_name.clone();
let index_name = index_name.clone();
let dataset = dataset.clone();
let deletion_mask = deletion_mask.clone();
let metrics = metrics.clone();
Self::map_batch(
column_name,
index_name,
dataset,
deletion_mask,
res,
metrics,
)
}))
}
}
impl ExecutionPlan for MapIndexExec {
fn name(&self) -> &str {
"MapIndexExec"
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> SchemaRef {
INDEX_LOOKUP_SCHEMA.clone()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
if children.len() != 1 {
Err(datafusion::error::DataFusionError::Internal(
"MapIndexExec requires exactly one child".to_string(),
))
} else {
Ok(Arc::new(Self::new(
self.dataset.clone(),
self.column_name.clone(),
self.index_name.clone(),
children.into_iter().next().unwrap(),
)))
}
}
fn execute(
&self,
partition: usize,
context: Arc<datafusion::execution::TaskContext>,
) -> datafusion::error::Result<datafusion::physical_plan::SendableRecordBatchStream> {
let index_vals = self.input.execute(partition, context)?;
let metrics = Arc::new(IndexMetrics::new(&self.metrics, partition));
let stream_fut = Self::do_execute(
index_vals,
self.dataset.clone(),
self.column_name.clone(),
self.index_name.clone(),
metrics,
);
let stream = futures::stream::iter(vec![stream_fut])
.then(|stream_fut| stream_fut)
.try_flatten()
.boxed();
Ok(Box::pin(InstrumentedRecordBatchStreamAdapter::new(
INDEX_LOOKUP_SCHEMA.clone(),
stream,
partition,
&self.metrics,
)))
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn supports_limit_pushdown(&self) -> bool {
false
}
}
pub static MATERIALIZE_INDEX_SCHEMA: LazyLock<SchemaRef> =
LazyLock::new(|| Arc::new(Schema::new(vec![ROW_ID_FIELD.clone()])));
#[derive(Debug)]
pub struct MaterializeIndexExec {
dataset: Arc<Dataset>,
expr: ScalarIndexExpr,
fragments: Arc<Vec<Fragment>>,
properties: PlanProperties,
metrics: ExecutionPlanMetricsSet,
}
impl DisplayAs for MaterializeIndexExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "MaterializeIndex: query={}", self.expr)
}
DisplayFormatType::TreeRender => {
write!(f, "MaterializeIndex\nquery={}", self.expr)
}
}
}
}
struct FragIdIter<'a> {
src: &'a [Fragment],
frag_idx: usize,
idx_in_frag: usize,
}
impl<'a> FragIdIter<'a> {
fn new(src: &'a [Fragment]) -> Self {
Self {
src,
frag_idx: 0,
idx_in_frag: 0,
}
}
}
impl Iterator for FragIdIter<'_> {
type Item = u64;
fn next(&mut self) -> Option<Self::Item> {
while self.frag_idx < self.src.len() {
let frag = &self.src[self.frag_idx];
if self.idx_in_frag
< frag
.physical_rows
.expect("Fragment doesn't have physical rows recorded")
{
let next_id =
RowAddress::new_from_parts(frag.id as u32, self.idx_in_frag as u32).into();
self.idx_in_frag += 1;
return Some(next_id);
}
self.frag_idx += 1;
self.idx_in_frag = 0;
}
None
}
}
impl MaterializeIndexExec {
pub fn new(
dataset: Arc<Dataset>,
expr: ScalarIndexExpr,
fragments: Arc<Vec<Fragment>>,
) -> Self {
let properties = PlanProperties::new(
EquivalenceProperties::new(MATERIALIZE_INDEX_SCHEMA.clone()),
Partitioning::RoundRobinBatch(1),
EmissionType::Incremental,
Boundedness::Bounded,
);
Self {
dataset,
expr,
fragments,
properties,
metrics: ExecutionPlanMetricsSet::new(),
}
}
#[instrument(name = "materialize_scalar_index", skip_all, level = "debug")]
async fn do_execute(
expr: ScalarIndexExpr,
dataset: Arc<Dataset>,
fragments: Arc<Vec<Fragment>>,
metrics: Arc<IndexMetrics>,
) -> Result<RecordBatch> {
let expr_result = expr.evaluate(dataset.as_ref(), metrics.as_ref());
let span = debug_span!("create_prefilter");
let prefilter = span.in_scope(|| {
let fragment_bitmap =
RoaringBitmap::from_iter(fragments.iter().map(|frag| frag.id as u32));
DatasetPreFilter::create_deletion_mask(dataset.clone(), fragment_bitmap)
});
let mask = if let Some(prefilter) = prefilter {
let (expr_result, prefilter) = futures::try_join!(expr_result, prefilter)?;
let mask = match expr_result {
IndexExprResult::Exact(mask) => mask,
IndexExprResult::AtMost(mask) => mask,
IndexExprResult::AtLeast(_) => todo!("Support AtLeast in MaterializeIndexExec"),
};
mask & (*prefilter).clone()
} else {
let expr_result = expr_result.await?;
match expr_result {
IndexExprResult::Exact(mask) => mask,
IndexExprResult::AtMost(mask) => mask,
IndexExprResult::AtLeast(_) => todo!("Support AtLeast in MaterializeIndexExec"),
}
};
let ids = row_ids_for_mask(mask, &dataset, &fragments).await?;
let ids = UInt64Array::from(ids);
Ok(RecordBatch::try_new(
MATERIALIZE_INDEX_SCHEMA.clone(),
vec![Arc::new(ids)],
)?)
}
}
#[instrument(name = "make_row_ids", skip(mask, dataset, fragments))]
async fn row_ids_for_mask(
mask: RowAddrMask,
dataset: &Dataset,
fragments: &[Fragment],
) -> Result<Vec<u64>> {
match mask {
RowAddrMask::BlockList(block_list) if block_list.is_empty() => {
if dataset.manifest.uses_stable_row_ids() {
let sequences = load_row_id_sequences(dataset, fragments)
.map_ok(|(_frag_id, sequence)| sequence)
.try_collect::<Vec<_>>()
.await?;
let capacity = sequences.iter().map(|seq| seq.len() as usize).sum();
let mut row_ids = Vec::with_capacity(capacity);
for sequence in sequences {
row_ids.extend(sequence.iter());
}
Ok(row_ids)
} else {
Ok(FragIdIter::new(fragments).collect::<Vec<_>>())
}
}
RowAddrMask::AllowList(mut allow_list) => {
retain_fragments(&mut allow_list, fragments, dataset).await?;
if let Some(allow_list_iter) = allow_list.row_addrs() {
Ok(allow_list_iter.map(u64::from).collect::<Vec<_>>())
} else {
debug_assert!(!dataset.manifest.uses_stable_row_ids());
Ok(FragIdIter::new(fragments)
.filter(|row_id| allow_list.contains(*row_id))
.collect())
}
}
RowAddrMask::BlockList(block_list) => {
if dataset.manifest.uses_stable_row_ids() {
let sequences = load_row_id_sequences(dataset, fragments)
.map_ok(|(_frag_id, sequence)| sequence)
.try_collect::<Vec<_>>()
.await?;
let mut capacity = sequences.iter().map(|seq| seq.len() as usize).sum();
capacity -= block_list.len().expect("unknown block list len") as usize;
let mut row_ids = Vec::with_capacity(capacity);
for sequence in sequences {
row_ids.extend(
sequence
.iter()
.filter(|row_id| !block_list.contains(*row_id)),
);
}
Ok(row_ids)
} else {
Ok(FragIdIter::new(fragments)
.filter(|row_id| !block_list.contains(*row_id))
.collect())
}
}
}
}
async fn retain_fragments(
allow_list: &mut RowAddrTreeMap,
fragments: &[Fragment],
dataset: &Dataset,
) -> Result<()> {
if dataset.manifest.uses_stable_row_ids() {
let fragment_ids = load_row_id_sequences(dataset, fragments)
.map_ok(|(_frag_id, sequence)| RowAddrTreeMap::from(sequence.as_ref()))
.try_fold(RowAddrTreeMap::new(), |mut acc, tree| async {
acc |= tree;
Ok(acc)
})
.await?;
*allow_list &= &fragment_ids;
} else {
allow_list.retain_fragments(fragments.iter().map(|frag| frag.id as u32));
}
Ok(())
}
impl ExecutionPlan for MaterializeIndexExec {
fn name(&self) -> &str {
"MaterializeIndexExec"
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> SchemaRef {
MATERIALIZE_INDEX_SCHEMA.clone()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
if !children.is_empty() {
Err(datafusion::error::DataFusionError::Internal(
"MaterializeIndexExec does not have children".to_string(),
))
} else {
Ok(self)
}
}
fn execute(
&self,
partition: usize,
context: Arc<datafusion::execution::context::TaskContext>,
) -> datafusion::error::Result<datafusion::physical_plan::SendableRecordBatchStream> {
let metrics = Arc::new(IndexMetrics::new(&self.metrics, partition));
let batch_fut = Self::do_execute(
self.expr.clone(),
self.dataset.clone(),
self.fragments.clone(),
metrics,
);
let stream = futures::stream::iter(vec![batch_fut])
.then(|batch_fut| batch_fut.map_err(|err| err.into()))
.boxed()
as BoxStream<'static, datafusion::common::Result<RecordBatch>>;
let stream = Box::pin(RecordBatchStreamAdapter::new(
MATERIALIZE_INDEX_SCHEMA.clone(),
stream,
));
let stream = break_stream(stream, context.session_config().batch_size());
Ok(Box::pin(InstrumentedRecordBatchStreamAdapter::new(
MATERIALIZE_INDEX_SCHEMA.clone(),
stream.map_err(|err| err.into()),
partition,
&self.metrics,
)))
}
fn statistics(&self) -> datafusion::error::Result<datafusion::physical_plan::Statistics> {
Ok(Statistics::new_unknown(&MATERIALIZE_INDEX_SCHEMA))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn supports_limit_pushdown(&self) -> bool {
false
}
}
#[cfg(test)]
mod tests {
use std::{ops::Bound, sync::Arc};
use arrow::datatypes::UInt64Type;
use datafusion::{
execution::TaskContext, physical_plan::ExecutionPlan, prelude::SessionConfig,
scalar::ScalarValue,
};
use futures::TryStreamExt;
use lance_core::utils::tempfile::TempStrDir;
use lance_datagen::gen_batch;
use lance_index::{
DatasetIndexExt, IndexType,
scalar::{
SargableQuery, ScalarIndexParams,
expression::{ScalarIndexExpr, ScalarIndexSearch},
},
};
use crate::{
Dataset,
io::exec::scalar_index::MaterializeIndexExec,
utils::test::{DatagenExt, FragmentCount, FragmentRowCount, NoContextTestFixture},
};
use super::{MapIndexExec, ScalarIndexExec};
struct TestFixture {
dataset: Arc<Dataset>,
_tmp_dir_guard: TempStrDir,
}
async fn test_fixture() -> TestFixture {
let test_dir = TempStrDir::default();
let test_uri = test_dir.as_str();
let mut dataset = gen_batch()
.col("ordered", lance_datagen::array::step::<UInt64Type>())
.into_dataset(
test_uri,
FragmentCount::from(10),
FragmentRowCount::from(10),
)
.await
.unwrap();
dataset
.create_index(
&["ordered"],
IndexType::BTree,
None,
&ScalarIndexParams::default(),
true,
)
.await
.unwrap();
TestFixture {
dataset: Arc::new(dataset),
_tmp_dir_guard: test_dir,
}
}
#[tokio::test]
async fn test_materialize_index_exec() {
let TestFixture {
dataset,
_tmp_dir_guard,
} = test_fixture().await;
let query = ScalarIndexExpr::Query(ScalarIndexSearch {
column: "ordered".to_string(),
index_name: "ordered_idx".to_string(),
query: Arc::new(SargableQuery::Range(
Bound::Unbounded,
Bound::Excluded(ScalarValue::UInt64(Some(47))),
)),
needs_recheck: false,
});
let fragments = dataset.fragments().clone();
let plan = MaterializeIndexExec::new(dataset, query, fragments);
let stream = plan.execute(0, Arc::new(TaskContext::default())).unwrap();
let batches = stream.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 47);
let context =
TaskContext::default().with_session_config(SessionConfig::default().with_batch_size(5));
let stream = plan.execute(0, Arc::new(context)).unwrap();
let batches = stream.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(batches.len(), 10);
assert_eq!(batches[0].num_rows(), 5);
}
#[test]
fn no_context_scalar_index() {
let fixture = NoContextTestFixture::new();
let arc_dasaset = Arc::new(fixture.dataset);
let query = ScalarIndexExpr::Query(ScalarIndexSearch {
column: "ordered".to_string(),
index_name: "ordered_idx".to_string(),
query: Arc::new(SargableQuery::Range(
Bound::Unbounded,
Bound::Excluded(ScalarValue::UInt64(Some(47))),
)),
needs_recheck: false,
});
let plan = ScalarIndexExec::new(arc_dasaset.clone(), query.clone());
plan.execute(0, Arc::new(TaskContext::default())).unwrap();
let plan = MapIndexExec::new(
arc_dasaset.clone(),
"ordered".to_string(),
"ordered_idx".to_string(),
Arc::new(plan),
);
plan.execute(0, Arc::new(TaskContext::default())).unwrap();
let plan =
MaterializeIndexExec::new(arc_dasaset.clone(), query, arc_dasaset.fragments().clone());
plan.execute(0, Arc::new(TaskContext::default())).unwrap();
}
}