datafusion-index-provider 0.1.0

A Rust crate that adds index-based query acceleration to DataFusion TableProviders
Documentation

DataFusion Index Provider

A library that extends Apache DataFusion with index-based query acceleration for TableProvider implementations.

Overview

This crate implements a two-phase execution model for index-accelerated queries:

  1. Index Phase: Scan one or more secondary indexes to identify primary key values matching filter predicates
  2. Fetch Phase: Use primary key values to fetch complete records from underlying storage

This approach reduces I/O by limiting data retrieval to rows that satisfy query predicates, particularly effective for selective queries (typically < 10% of rows).

Installation

Add to your Cargo.toml:

[dependencies]
datafusion-index-provider = "0.1.0"

Core Concepts

Index Trait

Implement the Index trait to define how your index scans for matching primary key values:

use datafusion_index_provider::physical_plan::Index;

impl Index for MyIndex {
    fn name(&self) -> &str { "my_index" }
    fn column_name(&self) -> &str { "indexed_column" }
    fn index_schema(&self) -> SchemaRef {
        // Schema whose columns form the composite primary key
        // e.g. create_index_schema([Field::new("id", DataType::UInt64, false)])
    }
    fn table_name(&self) -> &str { "my_table" }

    fn scan(&self, filters: &[Expr], limit: Option<usize>)
        -> Result<SendableRecordBatchStream> {
        // Return RecordBatch stream with primary key columns matching the filters
    }

    fn statistics(&self) -> Statistics { /* index statistics */ }
}

RecordFetcher Trait

Implement RecordFetcher to define how complete records are retrieved using primary key values:

use datafusion_index_provider::physical_plan::fetcher::RecordFetcher;

#[async_trait]
impl RecordFetcher for MyTable {
    async fn fetch(&self, index_batch: RecordBatch) -> Result<RecordBatch> {
        // index_batch contains primary key columns as defined by index_schema()
        // Return complete records for those primary keys
    }
}

IndexedTableProvider Trait

Implement IndexedTableProvider on your TableProvider to expose available indexes:

use datafusion_index_provider::provider::IndexedTableProvider;

impl IndexedTableProvider for MyTable {
    fn indexes(&self) -> Result<Vec<Arc<dyn Index>>> {
        Ok(vec![
            Arc::new(MyAgeIndex::new()),
            Arc::new(MyDepartmentIndex::new()),
        ])
    }
}

Integration with TableProvider

Use the provided analysis and execution plan methods in your TableProvider::scan:

#[async_trait]
impl TableProvider for MyTable {
    async fn scan(
        &self,
        _state: &dyn Session,
        projection: Option<&Vec<usize>>,
        filters: &[Expr],
        limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        // Analyze which filters can use indexes
        let (indexable, remaining) = self.analyze_and_optimize_filters(filters)?;

        if !indexable.is_empty() {
            // Create index-accelerated execution plan
            self.create_execution_plan_with_indexes(
                &indexable,
                projection,
                &remaining,
                limit,
                self.schema(),
                Arc::new(self.clone()) as Arc<dyn RecordFetcher>,
            ).await
        } else {
            // Fall back to regular table scan
            self.create_full_scan_plan(projection, filters, limit).await
        }
    }
}

Query Optimization

Filter Analysis

The analyze_and_optimize_filters method recursively analyzes filter expressions to build an IndexFilter tree:

  • Single predicates: Matched to indexes via Index::supports_predicate()
  • AND expressions: All sub-expressions must be indexable; creates IndexFilter::And
  • OR expressions: All sub-expressions must be indexable; creates IndexFilter::Or
  • Nested combinations: Supports arbitrary nesting depth

All-or-nothing policy: If any part of an AND/OR expression cannot be indexed, the entire expression is treated as non-indexable.

Supported Query Patterns

-- Single index scan
SELECT * FROM employees WHERE age = 30;

-- Multi-index intersection (AND)
SELECT * FROM employees WHERE age > 25 AND department = 'Engineering';

-- Multi-index union (OR) with automatic deduplication
SELECT * FROM employees WHERE age < 25 OR department = 'Sales';

-- Complex nested expressions
SELECT * FROM employees
WHERE (age > 30 AND department = 'Engineering')
   OR (age < 25 AND department = 'Sales');

Execution Plans

The library generates optimized physical plans based on filter structure:

Single Index

RecordFetchExec
└── IndexScanExec(age_index, [age = 30])

AND - Index Intersection

Uses Hash or SortMerge joins to intersect primary key values:

RecordFetchExec
└── Projection(PK columns)
    └── HashJoin(on: PK columns)
        ├── IndexScanExec(age_index, [age > 25])
        └── IndexScanExec(dept_index, [department = 'Engineering'])

Join Selection:

  • SortMergeJoin: When both indexes return ordered primary keys (Index::is_ordered())
  • HashJoin: When inputs are unordered (builds hash table from left side)

OR - Union with Deduplication

Uses UnionExec + AggregateExec to deduplicate overlapping primary key values:

RecordFetchExec
└── AggregateExec(GROUP BY PK columns)
    └── UnionExec
        ├── IndexScanExec(age_index, [age < 25])
        └── IndexScanExec(dept_index, [department = 'Sales'])

This prevents duplicate record fetches when primary key values appear in multiple index results.

Reference Implementation

See tests/common/ for complete working examples:

Single-column primary key:

Composite primary key:

Integration tests in tests/integration_tests.rs and tests/composite_pk_tests.rs demonstrate query scenarios including deeply nested AND/OR combinations with both single and composite primary keys.

Limitations

  1. Projection pushdown: Not currently supported for indexed scans (fetches all columns)
  2. Remaining filters: Non-indexed filters applied after record fetch, not during index scan
  3. Partitioning: Index scans produce single partition output

Testing

Run the test suite:

# Run all tests
cargo test

# Run with logging
RUST_LOG=debug cargo test

# Run integration tests only
cargo test --test integration_tests

The test suite includes:

  • 27 unit tests covering execution plan generation and streaming
  • 36 integration tests covering query scenarios from simple to deeply nested, with both single and composite primary keys

Compatibility

  • DataFusion: 52.x
  • Rust: 1.75+ (MSRV)
  • Arrow: Compatible with DataFusion's Arrow version

Architecture Details

For in-depth technical documentation, see:

License

Licensed under the Apache License, Version 2.0. See LICENSE for details.