datafusion_table_providers/clickhouse/
sql_table.rs1use async_trait::async_trait;
2use datafusion::catalog::Session;
3use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder, LogicalTableSource};
4use datafusion::physical_plan::ExecutionPlan;
5use datafusion::sql::sqlparser::ast::VisitMut;
6use datafusion::sql::unparser::Unparser;
7use std::fmt::Display;
8use std::sync::Arc;
9use std::{any::Any, fmt};
10
11use datafusion::{
12 arrow::datatypes::SchemaRef,
13 datasource::TableProvider,
14 error::Result as DataFusionResult,
15 logical_expr::{Expr, TableProviderFilterPushDown, TableType},
16};
17
18use crate::sql::sql_provider_datafusion::{default_filter_pushdown, SqlExec};
19use crate::util::table_arg_replace::TableArgReplace;
20
21use super::{into_table_args, ClickHouseTable};
22
23impl ClickHouseTable {
24 fn create_logical_plan(
25 &self,
26 projection: Option<&Vec<usize>>,
27 filters: &[Expr],
28 limit: Option<usize>,
29 ) -> DataFusionResult<LogicalPlan> {
30 let table_source = LogicalTableSource::new(self.schema.clone());
31 LogicalPlanBuilder::scan_with_filters(
32 self.table_reference.clone(),
33 Arc::new(table_source),
34 projection.cloned(),
35 filters.to_vec(),
36 )?
37 .limit(0, limit)?
38 .build()
39 }
40
41 fn create_physical_plan(
42 &self,
43 projection: Option<&Vec<usize>>,
44 sql: String,
45 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
46 Ok(Arc::new(SqlExec::new(
47 projection,
48 &self.schema(),
49 self.pool.clone(),
50 sql,
51 )?))
52 }
53}
54
55#[async_trait]
56impl TableProvider for ClickHouseTable {
57 fn as_any(&self) -> &dyn Any {
58 self
59 }
60
61 fn schema(&self) -> SchemaRef {
62 self.schema.clone()
63 }
64
65 fn table_type(&self) -> TableType {
66 TableType::Base
67 }
68
69 fn supports_filters_pushdown(
70 &self,
71 filters: &[&Expr],
72 ) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
73 let filter_push_down = default_filter_pushdown(filters, &*self.dialect);
74 Ok(filter_push_down)
75 }
76
77 async fn scan(
78 &self,
79 _state: &dyn Session,
80 projection: Option<&Vec<usize>>,
81 filters: &[Expr],
82 limit: Option<usize>,
83 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
84 let logical_plan = self.create_logical_plan(projection, filters, limit)?;
85 let mut sql = Unparser::new(&*self.dialect).plan_to_sql(&logical_plan)?;
86
87 if let Some(args) = self.args.clone() {
88 let args = into_table_args(args);
89 let mut table_args = TableArgReplace::new(vec![(self.table_reference.clone(), args)]);
90 let _ = sql.visit(&mut table_args);
91 }
92
93 let sql = sql.to_string();
94 return self.create_physical_plan(projection, sql);
95 }
96}
97
98impl Display for ClickHouseTable {
99 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100 write!(f, "ClickHouseTable {}", self.table_reference)
101 }
102}