indexlake-datafusion 0.4.0

IndexLake datafusion integration
Documentation
use std::collections::HashMap;
use std::sync::Arc;

use arrow::datatypes::SchemaRef;
use datafusion_catalog::{Session, TableProvider};
use datafusion_common::DataFusionError;
use datafusion_common::stats::Precision;
use datafusion_common::{DFSchema, Statistics};
use datafusion_expr::Expr;
use datafusion_expr::TableProviderFilterPushDown;
use datafusion_expr::TableType;
use datafusion_expr::dml::InsertOp;
use datafusion_physical_plan::ExecutionPlan;
use indexlake::Client;
use indexlake::index::FilterSupport;
use indexlake::table::{Table, TableScanPartition};
use indexlake::utils::schema_without_row_id;
use log::warn;
use tokio::sync::Mutex;

use crate::{
    IndexLakeInsertExec, IndexLakeScanExec, datafusion_expr_to_indexlake_expr,
    indexlake_scalar_to_datafusion_scalar,
};

#[derive(Debug)]
pub struct IndexLakeTable {
    client: Arc<Client>,
    table: Arc<Table>,
    batch_size: usize,
    num_scan_partitions: usize,
    column_defaults: HashMap<String, Expr>,
    hide_row_id: bool,
    bypass_insert_threshold: usize,
}

impl IndexLakeTable {
    pub fn try_new(client: Arc<Client>, table: Arc<Table>) -> Result<Self, DataFusionError> {
        let mut column_defaults = HashMap::new();
        for field_record in table.field_records.iter() {
            if let Some(default_value) = &field_record.default_value {
                let scalar_value = indexlake_scalar_to_datafusion_scalar(default_value)?;
                column_defaults.insert(
                    field_record.field_name.clone(),
                    Expr::Literal(scalar_value, None),
                );
            }
        }
        Ok(Self {
            client,
            table,
            batch_size: 2048,
            num_scan_partitions: 16,
            column_defaults,
            hide_row_id: false,
            bypass_insert_threshold: 1000,
        })
    }

    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
        self.batch_size = batch_size;
        self
    }

    pub fn with_num_scan_partitions(mut self, num_scan_partitions: usize) -> Self {
        self.num_scan_partitions = num_scan_partitions;
        self
    }

    pub fn with_hide_row_id(mut self, hide_row_id: bool) -> Self {
        self.hide_row_id = hide_row_id;
        self
    }

    pub fn with_bypass_insert_threshold(mut self, bypass_insert_threshold: usize) -> Self {
        self.bypass_insert_threshold = bypass_insert_threshold;
        self
    }
}

#[async_trait::async_trait]
impl TableProvider for IndexLakeTable {
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }

    fn schema(&self) -> SchemaRef {
        if self.hide_row_id {
            Arc::new(schema_without_row_id(&self.table.output_schema))
        } else {
            self.table.output_schema.clone()
        }
    }

    fn table_type(&self) -> TableType {
        TableType::Base
    }

    fn get_column_default(&self, column: &str) -> Option<&Expr> {
        self.column_defaults.get(column)
    }

    async fn scan(
        &self,
        _state: &dyn Session,
        projection: Option<&Vec<usize>>,
        filters: &[Expr],
        limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
        let data_file_count = self
            .table
            .data_file_count()
            .await
            .map_err(|e| DataFusionError::Internal(e.to_string()))?;
        let data_files = if data_file_count > 1000 {
            None
        } else {
            let records = self
                .table
                .data_file_records()
                .await
                .map_err(|e| DataFusionError::Internal(e.to_string()))?;
            Some(Arc::new(records))
        };

        let il_projection = if let Some(df_projection) = projection
            && self.hide_row_id
        {
            Some(df_projection.iter().map(|i| i + 1).collect::<Vec<_>>())
        } else {
            projection.cloned()
        };

        let lazy_table = LazyTable::new(
            self.client.clone(),
            self.table.namespace_name.clone(),
            self.table.table_name.clone(),
        )
        .with_table(self.table.clone());

        let exec = IndexLakeScanExec::try_new(
            lazy_table,
            self.table.output_schema.clone(),
            self.num_scan_partitions,
            data_files,
            il_projection,
            filters.to_vec(),
            self.batch_size,
            limit,
        )?;
        Ok(Arc::new(exec))
    }

    fn supports_filters_pushdown(
        &self,
        filters: &[&Expr],
    ) -> Result<Vec<TableProviderFilterPushDown>, DataFusionError> {
        let df_schema = DFSchema::try_from(self.table.output_schema.clone())?;
        let mut supports = Vec::with_capacity(filters.len());
        for filter in filters {
            let Ok(il_expr) = datafusion_expr_to_indexlake_expr(filter, &df_schema) else {
                supports.push(TableProviderFilterPushDown::Unsupported);
                continue;
            };
            let support = self
                .table
                .supports_filter(il_expr.clone())
                .map_err(|e| DataFusionError::Internal(e.to_string()))?;
            match support {
                FilterSupport::Exact => supports.push(TableProviderFilterPushDown::Exact),
                FilterSupport::Inexact => supports.push(TableProviderFilterPushDown::Inexact),
                FilterSupport::Unsupported => {
                    supports.push(TableProviderFilterPushDown::Unsupported)
                }
            }
        }
        Ok(supports)
    }

    fn statistics(&self) -> Option<Statistics> {
        let row_count_result = tokio::task::block_in_place(|| {
            tokio::runtime::Handle::current().block_on(async {
                self.table
                    .count(TableScanPartition::single_partition())
                    .await
            })
        });
        match row_count_result {
            Ok(row_count) => Some(Statistics {
                num_rows: Precision::Exact(row_count),
                total_byte_size: Precision::Absent,
                column_statistics: Statistics::unknown_column(&self.table.output_schema),
            }),
            Err(e) => {
                warn!(
                    "[indexlake] Error getting indexlake table {}.{} row count: {:?}",
                    self.table.namespace_name, self.table.table_name, e
                );
                None
            }
        }
    }

    async fn insert_into(
        &self,
        _state: &dyn Session,
        input: Arc<dyn ExecutionPlan>,
        insert_op: InsertOp,
    ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
        let lazy_table = LazyTable::new(
            self.client.clone(),
            self.table.namespace_name.clone(),
            self.table.table_name.clone(),
        )
        .with_table(self.table.clone());

        let insert_exec = IndexLakeInsertExec::try_new(
            lazy_table,
            input,
            insert_op,
            self.bypass_insert_threshold,
        )?;

        Ok(Arc::new(insert_exec))
    }
}

/// A lazy-loaded table holder containing client and table metadata.
/// The actual Table is loaded on first access if not provided upfront.
#[derive(Debug, Clone)]
pub struct LazyTable {
    pub client: Arc<Client>,
    pub namespace_name: String,
    pub table_name: String,
    table: Arc<Mutex<Option<Arc<Table>>>>,
}

impl LazyTable {
    /// Create a new LazyTable without a pre-loaded table.
    pub fn new(client: Arc<Client>, namespace_name: String, table_name: String) -> Self {
        Self {
            client,
            namespace_name,
            table_name,
            table: Arc::new(Mutex::new(None)),
        }
    }

    /// Create a new LazyTable with a pre-loaded table.
    pub fn with_table(mut self, table: Arc<Table>) -> Self {
        self.table = Arc::new(Mutex::new(Some(table)));
        self
    }

    /// Get the table, loading it lazily if not already loaded.
    pub async fn get_or_load(&self) -> Result<Arc<Table>, indexlake::ILError> {
        let mut guard = self.table.lock().await;
        if let Some(table) = guard.as_ref() {
            return Ok(table.clone());
        }

        let table = self
            .client
            .load_table(&self.namespace_name, &self.table_name)
            .await?;
        let table = Arc::new(table);
        *guard = Some(table.clone());
        Ok(table)
    }
}