datafusion-table-providers 0.12.0

Extend the capabilities of DataFusion to support additional data sources via implementations of the `TableProvider` trait.
use async_trait::async_trait;
use datafusion::catalog::Session;
use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder, LogicalTableSource};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::sql::sqlparser::ast::VisitMut;
use datafusion::sql::unparser::Unparser;
use std::fmt;
use std::fmt::Display;
use std::sync::Arc;

use datafusion::{
    arrow::datatypes::SchemaRef,
    datasource::TableProvider,
    error::Result as DataFusionResult,
    logical_expr::{Expr, TableProviderFilterPushDown, TableType},
};

use crate::sql::sql_provider_datafusion::{default_filter_pushdown, SqlExec};
use crate::util::table_arg_replace::TableArgReplace;

use super::{into_table_args, ClickHouseTable};

impl ClickHouseTable {
    fn create_logical_plan(
        &self,
        projection: Option<&Vec<usize>>,
        filters: &[Expr],
        limit: Option<usize>,
    ) -> DataFusionResult<LogicalPlan> {
        let table_source = LogicalTableSource::new(self.schema.clone());
        LogicalPlanBuilder::scan_with_filters(
            self.table_reference.clone(),
            Arc::new(table_source),
            projection.cloned(),
            filters.to_vec(),
        )?
        .limit(0, limit)?
        .build()
    }

    fn create_physical_plan(
        &self,
        projection: Option<&Vec<usize>>,
        sql: String,
    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
        Ok(Arc::new(SqlExec::new(
            projection,
            &self.schema(),
            self.pool.clone(),
            sql,
            Arc::new(datafusion::sql::unparser::dialect::DefaultDialect {}),
        )?))
    }
}

#[async_trait]
impl TableProvider for ClickHouseTable {
    fn schema(&self) -> SchemaRef {
        self.schema.clone()
    }

    fn table_type(&self) -> TableType {
        TableType::Base
    }

    fn supports_filters_pushdown(
        &self,
        filters: &[&Expr],
    ) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
        let filter_push_down = default_filter_pushdown(filters, &*self.dialect);
        Ok(filter_push_down)
    }

    async fn scan(
        &self,
        _state: &dyn Session,
        projection: Option<&Vec<usize>>,
        filters: &[Expr],
        limit: Option<usize>,
    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
        let logical_plan = self.create_logical_plan(projection, filters, limit)?;
        let mut sql = Unparser::new(&*self.dialect).plan_to_sql(&logical_plan)?;

        if let Some(args) = self.args.clone() {
            let args = into_table_args(args);
            let mut table_args = TableArgReplace::new(vec![(self.table_reference.clone(), args)]);
            let _ = sql.visit(&mut table_args);
        }

        let sql = sql.to_string();
        return self.create_physical_plan(projection, sql);
    }
}

impl Display for ClickHouseTable {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "ClickHouseTable {}", self.table_reference)
    }
}