indexlake-datafusion 0.2.0

IndexLake datafusion integration
Documentation
use std::any::Any;
use std::ops::Range;
use std::sync::Arc;

use datafusion::arrow::array::{RecordBatch, RecordBatchOptions};
use datafusion::arrow::datatypes::{Schema, SchemaRef};
use datafusion::common::stats::Precision;
use datafusion::common::{DFSchema, Statistics, project_schema};
use datafusion::error::DataFusionError;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::display::ProjectSchemaDisplay;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::limit::LimitStream;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
};
use datafusion::prelude::Expr;
use futures::{StreamExt, TryStreamExt};
use indexlake::catalog::DataFileRecord;
use indexlake::table::{Table, TableScan, TableScanPartition};
use log::error;

use crate::datafusion_expr_to_indexlake_expr;

#[derive(Debug)]
pub struct IndexLakeScanExec {
    pub table: Arc<Table>,
    pub partition_count: usize,
    pub data_files: Option<Arc<Vec<DataFileRecord>>>,
    pub concurrency: Option<usize>,
    pub projection: Option<Vec<usize>>,
    pub filters: Vec<Expr>,
    pub limit: Option<usize>,
    pub data_file_partition_ranges: Option<Vec<Option<Range<usize>>>>,
    properties: PlanProperties,
}

impl IndexLakeScanExec {
    pub fn try_new(
        table: Arc<Table>,
        partition_count: usize,
        data_files: Option<Arc<Vec<DataFileRecord>>>,
        concurrency: Option<usize>,
        projection: Option<Vec<usize>>,
        filters: Vec<Expr>,
        limit: Option<usize>,
    ) -> Result<Self, DataFusionError> {
        let projected_schema = project_schema(&table.output_schema, projection.as_ref())?;
        let properties = PlanProperties::new(
            EquivalenceProperties::new(projected_schema),
            Partitioning::UnknownPartitioning(partition_count),
            EmissionType::Incremental,
            Boundedness::Bounded,
        );
        let data_file_partition_ranges = data_files
            .as_ref()
            .map(|files| calc_data_file_partition_ranges(partition_count, files.len()));
        Ok(Self {
            table,
            partition_count,
            data_files,
            concurrency,
            projection,
            filters,
            limit,
            data_file_partition_ranges,
            properties,
        })
    }

    pub fn get_scan_partition(&self, partition: Option<usize>) -> TableScanPartition {
        match partition {
            Some(partition) => {
                if let Some(data_files) = self.data_files.as_ref()
                    && let Some(data_file_partition_ranges) =
                        self.data_file_partition_ranges.as_ref()
                {
                    let range = data_file_partition_ranges[partition].clone();
                    TableScanPartition::Provided {
                        contains_inline_rows: partition == 0,
                        data_file_records: if let Some(range) = range {
                            data_files[range].to_vec()
                        } else {
                            vec![]
                        },
                    }
                } else {
                    TableScanPartition::Auto {
                        partition_idx: partition,
                        partition_count: self.partition_count,
                    }
                }
            }
            None => TableScanPartition::single_partition(),
        }
    }
}

impl ExecutionPlan for IndexLakeScanExec {
    fn name(&self) -> &str {
        "IndexLakeScanExec"
    }

    fn as_any(&self) -> &dyn Any {
        self
    }

    fn properties(&self) -> &PlanProperties {
        &self.properties
    }

    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
        vec![]
    }

    fn with_new_children(
        self: Arc<Self>,
        _children: Vec<Arc<dyn ExecutionPlan>>,
    ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
        Ok(self)
    }

    fn execute(
        &self,
        partition: usize,
        _context: Arc<TaskContext>,
    ) -> Result<SendableRecordBatchStream, DataFusionError> {
        if partition >= self.partition_count {
            return Err(DataFusionError::Execution(format!(
                "partition index out of range: {partition} >= {}",
                self.partition_count
            )));
        }

        let df_schema = DFSchema::try_from(self.table.output_schema.clone())?;
        let il_filters = self
            .filters
            .iter()
            .map(|f| datafusion_expr_to_indexlake_expr(f, &df_schema))
            .collect::<Result<Vec<_>, _>>()?;

        let scan_partition = self.get_scan_partition(Some(partition));

        let mut scan = TableScan::default()
            .with_projection(self.projection.clone())
            .with_filters(il_filters)
            .with_partition(scan_partition);

        if let Some(limit) = self.limit {
            scan.batch_size = limit;
            scan.concurrency = limit / 10000 + 1;
        }

        // Override auto concurrency
        if let Some(concurrency) = self.concurrency {
            scan.concurrency = concurrency;
        }

        let projected_schema = self.schema();
        let fut = get_batch_stream(
            self.table.clone(),
            projected_schema.clone(),
            scan,
            self.limit,
        );
        let stream = futures::stream::once(fut).try_flatten();
        Ok(Box::pin(RecordBatchStreamAdapter::new(
            projected_schema,
            stream,
        )))
    }

    fn partition_statistics(
        &self,
        partition: Option<usize>,
    ) -> Result<Statistics, DataFusionError> {
        let scan_partition = self.get_scan_partition(partition);

        let row_count_result = tokio::task::block_in_place(|| {
            tokio::runtime::Handle::current()
                .block_on(async { self.table.count(scan_partition).await })
        });
        match row_count_result {
            Ok(row_count) => {
                if self.filters.is_empty() {
                    if let Some(limit) = self.limit {
                        Ok(Statistics {
                            num_rows: Precision::Exact(std::cmp::min(row_count, limit)),
                            total_byte_size: Precision::Absent,
                            column_statistics: Statistics::unknown_column(&self.schema()),
                        })
                    } else {
                        Ok(Statistics {
                            num_rows: Precision::Exact(row_count),
                            total_byte_size: Precision::Absent,
                            column_statistics: Statistics::unknown_column(&self.schema()),
                        })
                    }
                } else {
                    Ok(Statistics {
                        num_rows: Precision::Inexact(row_count),
                        total_byte_size: Precision::Absent,
                        column_statistics: Statistics::unknown_column(&self.schema()),
                    })
                }
            }
            Err(e) => Err(DataFusionError::Plan(format!(
                "Error getting indexlake table {}.{} row count: {:?}",
                self.table.namespace_name, self.table.table_name, e
            ))),
        }
    }

    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
        match IndexLakeScanExec::try_new(
            self.table.clone(),
            self.partition_count,
            self.data_files.clone(),
            self.concurrency,
            self.projection.clone(),
            self.filters.clone(),
            limit,
        ) {
            Ok(exec) => Some(Arc::new(exec)),
            Err(e) => {
                error!("[indexlake] Failed to create IndexLakeScanExec with fetch: {e}");
                None
            }
        }
    }

    fn fetch(&self) -> Option<usize> {
        self.limit
    }
}

impl DisplayAs for IndexLakeScanExec {
    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        write!(
            f,
            "IndexLakeScanExec: table={}.{}, partitions={}",
            self.table.namespace_name, self.table.table_name, self.partition_count
        )?;
        let projected_schema = self.schema();
        if !schema_projection_equals(&projected_schema, &self.table.output_schema) {
            write!(
                f,
                ", projection={}",
                ProjectSchemaDisplay(&projected_schema)
            )?;
        }
        if !self.filters.is_empty() {
            write!(
                f,
                ", filters=[{}]",
                self.filters
                    .iter()
                    .map(|f| f.to_string())
                    .collect::<Vec<_>>()
                    .join(", ")
            )?;
        }
        if let Some(limit) = self.limit {
            write!(f, ", limit={limit}")?;
        }
        if let Some(concurrency) = self.concurrency {
            write!(f, ", concurrency={concurrency}")?;
        }
        Ok(())
    }
}

fn schema_projection_equals(left: &Schema, right: &Schema) -> bool {
    if left.fields.len() != right.fields.len() {
        return false;
    }
    for (left_field, right_field) in left.fields.iter().zip(right.fields.iter()) {
        if left_field.name() != right_field.name() {
            return false;
        }
    }
    true
}

async fn get_batch_stream(
    table: Arc<Table>,
    projected_schema: SchemaRef,
    mut scan: TableScan,
    limit: Option<usize>,
) -> Result<SendableRecordBatchStream, DataFusionError> {
    let stream = if scan.projection == Some(Vec::new()) {
        scan.projection = Some(vec![0]);
        let stream = table
            .scan(scan)
            .await
            .map_err(|e| DataFusionError::Execution(e.to_string()))?;
        stream
            .map(|batch| {
                let batch = batch?;
                let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
                let new_batch =
                    RecordBatch::try_new_with_options(Arc::new(Schema::empty()), vec![], &options)?;
                Ok(new_batch)
            })
            .boxed()
    } else {
        table
            .scan(scan)
            .await
            .map_err(|e| DataFusionError::Execution(e.to_string()))?
    };
    let stream = stream.map_err(|e| DataFusionError::Execution(e.to_string()));
    let stream = Box::pin(RecordBatchStreamAdapter::new(projected_schema, stream));
    let metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
    let limit_stream = LimitStream::new(stream, 0, limit, metrics);
    Ok(Box::pin(limit_stream))
}

fn calc_data_file_partition_ranges(
    partition_count: usize,
    data_file_count: usize,
) -> Vec<Option<Range<usize>>> {
    let mut partition_allocations = vec![0; partition_count];

    if partition_count > data_file_count {
        for partition_allocation in partition_allocations.iter_mut().take(data_file_count) {
            *partition_allocation = 1;
        }
    } else {
        let partition_size = data_file_count / partition_count;
        for partition_allocation in partition_allocations.iter_mut() {
            *partition_allocation = partition_size;
        }

        let left = data_file_count - partition_count * partition_size;
        for partition_allocation in partition_allocations.iter_mut().take(left) {
            *partition_allocation += 1;
        }
    }

    let mut ranges = Vec::with_capacity(partition_count);
    let mut start = 0usize;
    for partition_allocation in partition_allocations.iter() {
        if *partition_allocation == 0 {
            ranges.push(None);
        } else {
            let partition_range = start..start + *partition_allocation;
            ranges.push(Some(partition_range));
            start += *partition_allocation;
        }
    }

    ranges
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_partition_data_file_range() {
        let ranges = calc_data_file_partition_ranges(2, 0);
        assert_eq!(ranges, vec![None, None]);

        let ranges = calc_data_file_partition_ranges(2, 1);
        assert_eq!(ranges, vec![Some(0..1), None]);

        let ranges = calc_data_file_partition_ranges(2, 2);
        assert_eq!(ranges, vec![Some(0..1), Some(1..2)]);

        let ranges = calc_data_file_partition_ranges(2, 3);
        assert_eq!(ranges, vec![Some(0..2), Some(2..3)]);

        let ranges = calc_data_file_partition_ranges(2, 4);
        assert_eq!(ranges, vec![Some(0..2), Some(2..4)]);
    }
}