datafusion_table_providers/clickhouse/
sql_table.rs

1use async_trait::async_trait;
2use datafusion::catalog::Session;
3use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder, LogicalTableSource};
4use datafusion::physical_plan::ExecutionPlan;
5use datafusion::sql::sqlparser::ast::VisitMut;
6use datafusion::sql::unparser::Unparser;
7use std::fmt::Display;
8use std::sync::Arc;
9use std::{any::Any, fmt};
10
11use datafusion::{
12    arrow::datatypes::SchemaRef,
13    datasource::TableProvider,
14    error::Result as DataFusionResult,
15    logical_expr::{Expr, TableProviderFilterPushDown, TableType},
16};
17
18use crate::sql::sql_provider_datafusion::{default_filter_pushdown, SqlExec};
19use crate::util::table_arg_replace::TableArgReplace;
20
21use super::{into_table_args, ClickHouseTable};
22
23impl ClickHouseTable {
24    fn create_logical_plan(
25        &self,
26        projection: Option<&Vec<usize>>,
27        filters: &[Expr],
28        limit: Option<usize>,
29    ) -> DataFusionResult<LogicalPlan> {
30        let table_source = LogicalTableSource::new(self.schema.clone());
31        LogicalPlanBuilder::scan_with_filters(
32            self.table_reference.clone(),
33            Arc::new(table_source),
34            projection.cloned(),
35            filters.to_vec(),
36        )?
37        .limit(0, limit)?
38        .build()
39    }
40
41    fn create_physical_plan(
42        &self,
43        projection: Option<&Vec<usize>>,
44        sql: String,
45    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
46        Ok(Arc::new(SqlExec::new(
47            projection,
48            &self.schema(),
49            self.pool.clone(),
50            sql,
51        )?))
52    }
53}
54
55#[async_trait]
56impl TableProvider for ClickHouseTable {
57    fn as_any(&self) -> &dyn Any {
58        self
59    }
60
61    fn schema(&self) -> SchemaRef {
62        self.schema.clone()
63    }
64
65    fn table_type(&self) -> TableType {
66        TableType::Base
67    }
68
69    fn supports_filters_pushdown(
70        &self,
71        filters: &[&Expr],
72    ) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
73        let filter_push_down = default_filter_pushdown(filters, &*self.dialect);
74        Ok(filter_push_down)
75    }
76
77    async fn scan(
78        &self,
79        _state: &dyn Session,
80        projection: Option<&Vec<usize>>,
81        filters: &[Expr],
82        limit: Option<usize>,
83    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
84        let logical_plan = self.create_logical_plan(projection, filters, limit)?;
85        let mut sql = Unparser::new(&*self.dialect).plan_to_sql(&logical_plan)?;
86
87        if let Some(args) = self.args.clone() {
88            let args = into_table_args(args);
89            let mut table_args = TableArgReplace::new(vec![(self.table_reference.clone(), args)]);
90            let _ = sql.visit(&mut table_args);
91        }
92
93        let sql = sql.to_string();
94        return self.create_physical_plan(projection, sql);
95    }
96}
97
98impl Display for ClickHouseTable {
99    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100        write!(f, "ClickHouseTable {}", self.table_reference)
101    }
102}