use crate::physical_plan::exec::fetch::RecordFetchExec;
use crate::physical_plan::fetcher::RecordFetcher;
use crate::physical_plan::Index;
use crate::types::{IndexFilter, IndexFilters, UnionMode};
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::catalog::TableProvider;
use datafusion::error::Result;
use datafusion::logical_expr::{Expr, Operator, TableProviderFilterPushDown};
use datafusion::physical_plan::ExecutionPlan;
use std::sync::Arc;
#[async_trait]
pub trait IndexedTableProvider: TableProvider + Sync + Send {
fn indexes(&self) -> Result<Vec<Arc<dyn Index>>>;
fn analyze_and_optimize_filters(&self, filters: &[Expr]) -> Result<(IndexFilters, Vec<Expr>)> {
let (indexed_filters, remaining_filters): (Vec<_>, Vec<_>) = filters
.iter()
.map(|expr| match self.build_index_filter(expr) {
Ok(Some(filter)) => Ok((Some(filter), None)),
Ok(None) => Ok((None, Some(expr.clone()))),
Err(e) => Err(e),
})
.collect::<Result<Vec<_>>>()?
.into_iter()
.unzip();
let indexed_filters: Vec<_> = indexed_filters.into_iter().flatten().collect();
let remaining_filters: Vec<_> = remaining_filters.into_iter().flatten().collect();
let final_filters = match indexed_filters.len() {
0 => Vec::new(),
1 => indexed_filters,
_ => vec![IndexFilter::And(indexed_filters)],
};
Ok((final_filters, remaining_filters))
}
fn build_index_filter(&self, expr: &Expr) -> Result<Option<IndexFilter>> {
if let Expr::BinaryExpr(be) = expr {
let op = match be.op {
Operator::And => |l, r| Ok(Some(IndexFilter::And(vec![l, r]))),
Operator::Or => |l, r| Ok(Some(IndexFilter::Or(vec![l, r]))),
_ => {
return self.find_index_for_expr(expr);
}
};
let l_filter = self.build_index_filter(be.left.as_ref())?;
let r_filter = self.build_index_filter(be.right.as_ref())?;
if let (Some(l), Some(r)) = (l_filter, r_filter) {
return op(l, r);
} else {
return Ok(None);
}
}
self.find_index_for_expr(expr)
}
fn find_index_for_expr(&self, expr: &Expr) -> Result<Option<IndexFilter>> {
for index in self.indexes()? {
if index.supports_predicate(expr)? {
return Ok(Some(IndexFilter::Single {
index,
filter: expr.clone(),
}));
}
}
Ok(None)
}
fn union_mode(&self) -> UnionMode {
UnionMode::default()
}
fn supports_filters_index_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
let indexes = self.indexes()?;
filters
.iter()
.map(|filter| {
for index in &indexes {
if index.supports_predicate(filter)? {
return Ok(TableProviderFilterPushDown::Exact);
}
}
Ok(TableProviderFilterPushDown::Unsupported)
})
.collect()
}
async fn create_execution_plan_with_indexes(
&self,
indexes: &[IndexFilter],
_projection: Option<&Vec<usize>>,
_remaining_filters: &[Expr],
limit: Option<usize>,
schema: SchemaRef,
mapper: Arc<dyn RecordFetcher>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(RecordFetchExec::try_new(
indexes.to_vec(),
limit,
Arc::clone(&mapper),
schema.clone(),
self.union_mode(),
)?))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::physical_plan::Index;
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::catalog::Session;
use datafusion::common::Statistics;
use datafusion::datasource::TableType;
use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
use datafusion::prelude::{col, lit};
use datafusion_common::{DataFusionError, Result};
use std::any::Any;
#[derive(Debug)]
struct MockIndex {
column_name: String,
table_name: String,
}
impl Index for MockIndex {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"mock_index"
}
fn index_schema(&self) -> SchemaRef {
Arc::new(Schema::new(vec![Field::new("id", DataType::UInt32, false)]))
}
fn table_name(&self) -> &str {
&self.table_name
}
fn column_name(&self) -> &str {
&self.column_name
}
fn scan(
&self,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<SendableRecordBatchStream, DataFusionError> {
unimplemented!("MockIndex::scan")
}
fn statistics(&self) -> Statistics {
Statistics::new_unknown(&self.index_schema())
}
}
#[derive(Debug)]
struct MockTableProvider {
indexes: Vec<Arc<dyn Index>>,
}
impl MockTableProvider {
fn new(indexes: Vec<Arc<dyn Index>>) -> Self {
Self { indexes }
}
}
#[async_trait]
impl TableProvider for MockTableProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]))
}
fn table_type(&self) -> TableType {
TableType::Base
}
async fn scan(
&self,
_state: &dyn Session,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
}
#[async_trait]
impl IndexedTableProvider for MockTableProvider {
fn indexes(&self) -> Result<Vec<Arc<dyn Index>>> {
Ok(self.indexes.clone())
}
}
#[test]
fn test_analyze_simple_pushdown() -> Result<()> {
let provider = MockTableProvider::new(vec![Arc::new(MockIndex {
column_name: "a".into(),
table_name: "t".into(),
})]);
let filters = vec![col("a").eq(lit(1))];
let (indexed, remaining) = provider.analyze_and_optimize_filters(&filters)?;
assert_eq!(indexed.len(), 1);
assert!(matches!(indexed[0], IndexFilter::Single { .. }));
assert_eq!(remaining.len(), 0);
Ok(())
}
#[test]
fn test_analyze_no_pushdown() -> Result<()> {
let provider = MockTableProvider::new(vec![Arc::new(MockIndex {
column_name: "a".into(),
table_name: "t".into(),
})]);
let filters = vec![col("b").eq(lit(1))];
let (indexed, remaining) = provider.analyze_and_optimize_filters(&filters)?;
assert_eq!(indexed.len(), 0);
assert_eq!(remaining.len(), 1);
Ok(())
}
#[test]
fn test_analyze_mixed_pushdown() -> Result<()> {
let provider = MockTableProvider::new(vec![Arc::new(MockIndex {
column_name: "a".into(),
table_name: "t".into(),
})]);
let filters = vec![col("a").eq(lit(1)), col("b").eq(lit(2))];
let (indexed, remaining) = provider.analyze_and_optimize_filters(&filters)?;
assert_eq!(indexed.len(), 1);
assert!(matches!(indexed[0], IndexFilter::Single { .. }));
assert_eq!(remaining.len(), 1);
Ok(())
}
#[test]
fn test_analyze_and_pushdown() -> Result<()> {
let provider = MockTableProvider::new(vec![
Arc::new(MockIndex {
column_name: "a".into(),
table_name: "t".into(),
}),
Arc::new(MockIndex {
column_name: "b".into(),
table_name: "t".into(),
}),
]);
let filters = vec![col("a").eq(lit(1)).and(col("b").eq(lit(2)))];
let (indexed, remaining) = provider.analyze_and_optimize_filters(&filters)?;
assert_eq!(indexed.len(), 1);
assert!(matches!(indexed[0], IndexFilter::And { .. }));
assert_eq!(remaining.len(), 0);
Ok(())
}
#[test]
fn test_analyze_or_pushdown() -> Result<()> {
let provider = MockTableProvider::new(vec![
Arc::new(MockIndex {
column_name: "a".into(),
table_name: "t".into(),
}),
Arc::new(MockIndex {
column_name: "b".into(),
table_name: "t".into(),
}),
]);
let filters = vec![col("a").eq(lit(1)).or(col("b").eq(lit(2)))];
let (indexed, remaining) = provider.analyze_and_optimize_filters(&filters)?;
assert_eq!(indexed.len(), 1);
assert!(matches!(indexed[0], IndexFilter::Or { .. }));
assert_eq!(remaining.len(), 0);
Ok(())
}
#[test]
fn test_analyze_complex_no_pushdown() -> Result<()> {
let provider = MockTableProvider::new(vec![Arc::new(MockIndex {
column_name: "a".into(),
table_name: "t".into(),
})]);
let filters = vec![col("a").eq(lit(1)).and(col("b").eq(lit(2)))];
let (indexed, remaining) = provider.analyze_and_optimize_filters(&filters)?;
assert_eq!(indexed.len(), 0);
assert_eq!(remaining.len(), 1);
Ok(())
}
}