clickhouse_datafusion/providers/
table.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use datafusion::arrow::datatypes::SchemaRef;
5use datafusion::catalog::Session;
6use datafusion::datasource::TableProvider;
7use datafusion::datasource::sink::DataSinkExec;
8use datafusion::error::{DataFusionError, Result};
9use datafusion::execution::SendableRecordBatchStream;
10use datafusion::logical_expr::dml::InsertOp;
11use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
12use datafusion::physical_plan::ExecutionPlan;
13use datafusion::sql::TableReference;
14
15use crate::connection::ClickHouseConnectionPool;
16use crate::sink::ClickHouseDataSink;
17use crate::sql::{JoinPushDown, SqlTable, execute_sql_query};
18
19pub const CLICKHOUSE_TABLE_PROVIDER_NAME: &str = "ClickHouseTableProvider";
20
21#[derive(Debug)]
25pub struct ClickHouseTableProvider {
26 pub(crate) table: TableReference,
27 pub(crate) reader: SqlTable,
28}
29
30impl ClickHouseTableProvider {
31 pub async fn try_new(
36 pool: Arc<ClickHouseConnectionPool>,
37 table: TableReference,
38 ) -> Result<Self> {
39 let inner = SqlTable::try_new(CLICKHOUSE_TABLE_PROVIDER_NAME, pool, table.clone()).await?;
40 Ok(Self { reader: inner, table })
41 }
42
43 pub fn new_with_schema_unchecked(
47 pool: Arc<ClickHouseConnectionPool>,
48 table: TableReference,
49 schema: SchemaRef,
50 ) -> Self {
51 let inner = SqlTable::new_with_schema_unchecked(
52 CLICKHOUSE_TABLE_PROVIDER_NAME,
53 pool,
54 schema,
55 table.clone(),
56 );
57 Self { reader: inner, table }
58 }
59
60 #[must_use]
61 pub fn with_exprs(mut self, exprs: Vec<Expr>) -> Self {
62 self.reader = self.reader.with_exprs(exprs);
63 self
64 }
65
66 #[must_use]
69 pub fn with_coercion(mut self, coerce: bool) -> Self {
70 self.reader = self.reader.with_coercion(coerce);
71 self
72 }
73
74 pub fn reader(&self) -> &SqlTable { &self.reader }
76
77 pub fn pool(&self) -> &Arc<ClickHouseConnectionPool> { self.reader.pool() }
79
80 pub fn coerce_schema(&self) -> bool { self.reader.coerce_schema }
82
83 pub fn execute_sql(&self, query: &str, schema: SchemaRef) -> Result<SendableRecordBatchStream> {
92 let coerce_schema = self.reader.coerce_schema;
93 let pool = Arc::clone(self.pool());
94 let query = query.to_string();
95 Ok(execute_sql_query(query, pool, schema, coerce_schema))
96 }
97
98 pub fn unique_context(&self) -> String {
103 match self.pool().join_push_down() {
104 JoinPushDown::AllowedFor(context) => context,
105 JoinPushDown::Disallow => format!("{}", self.reader.unique_id()),
109 }
110 }
111}
112
113#[async_trait]
114impl TableProvider for ClickHouseTableProvider {
115 fn as_any(&self) -> &dyn std::any::Any { self }
116
117 fn schema(&self) -> SchemaRef { self.reader.schema() }
118
119 fn table_type(&self) -> datafusion::datasource::TableType { self.reader.table_type() }
120
121 fn supports_filters_pushdown(
122 &self,
123 filters: &[&Expr],
124 ) -> Result<Vec<TableProviderFilterPushDown>> {
125 self.reader.supports_filters_pushdown(filters)
126 }
127
128 async fn scan(
129 &self,
130 state: &dyn Session,
131 projection: Option<&Vec<usize>>,
132 filters: &[Expr],
133 limit: Option<usize>,
134 ) -> Result<Arc<dyn ExecutionPlan>> {
135 self.reader.scan(state, projection, filters, limit).await
136 }
137
138 async fn insert_into(
139 &self,
140 _state: &dyn Session,
141 input: Arc<dyn ExecutionPlan>,
142 insert_op: InsertOp,
143 ) -> Result<Arc<dyn ExecutionPlan>> {
144 if matches!(insert_op, InsertOp::Overwrite) {
149 return Err(DataFusionError::NotImplemented(
150 "OVERWRITE operation not supported for ClickHouse".to_string(),
151 ));
152 }
153
154 Ok(Arc::new(DataSinkExec::new(
155 input,
156 Arc::new(ClickHouseDataSink::new(
157 Arc::clone(self.reader.pool()),
158 self.table.clone(),
159 self.reader.schema(),
160 )),
161 None,
162 )))
163 }
164}
165
166#[cfg(feature = "federation")]
167mod federation {
168 use std::sync::Arc;
169
170 use async_trait::async_trait;
171 use datafusion::arrow::datatypes::SchemaRef;
172 use datafusion::catalog::TableProvider;
173 use datafusion::error::{DataFusionError, Result};
174 use datafusion::execution::SendableRecordBatchStream;
175 use datafusion::sql::TableReference;
176 use datafusion_federation::sql::{
177 RemoteTableRef, SQLExecutor, SQLFederationProvider, SQLTableSource,
178 };
179 use datafusion_federation::{FederatedTableProviderAdaptor, FederatedTableSource};
180 use tracing::debug;
181
182 use super::ClickHouseTableProvider;
183 use crate::dialect::ClickHouseDialect;
184
185 impl ClickHouseTableProvider {
186 pub fn create_federated_table_source(self: Arc<Self>) -> Arc<dyn FederatedTableSource> {
188 let table_name: RemoteTableRef = self.table.clone().into();
189 let schema = self.schema();
190 debug!(table = %table_name.table_ref(), "Creating federated table source");
191 let fed_provider = Arc::new(SQLFederationProvider::new(self));
192 Arc::new(SQLTableSource::new_with_schema(fed_provider, table_name, schema))
193 }
194
195 pub fn create_federated_table_provider(self: Arc<Self>) -> FederatedTableProviderAdaptor {
197 let table_source = Self::create_federated_table_source(Arc::clone(&self));
198 FederatedTableProviderAdaptor::new_with_provider(table_source, self)
199 }
200 }
201
202 #[async_trait]
203 impl SQLExecutor for ClickHouseTableProvider {
204 fn name(&self) -> &'static str { "clickhouse" }
205
206 fn compute_context(&self) -> Option<String> { self.reader.compute_context() }
207
208 fn dialect(&self) -> Arc<dyn datafusion::sql::unparser::dialect::Dialect> {
209 Arc::new(ClickHouseDialect)
210 }
211
212 fn ast_analyzer(&self) -> Option<datafusion_federation::sql::AstAnalyzer> {
213 None
216 }
217
218 fn execute(&self, query: &str, schema: SchemaRef) -> Result<SendableRecordBatchStream> {
219 self.reader.execute(query, schema)
220 }
221
222 async fn table_names(&self) -> Result<Vec<String>> {
223 self.pool()
224 .connect()
225 .await?
226 .tables(self.table.schema().expect("Schema must be present"))
227 .await
228 .map_err(|e| DataFusionError::External(Box::new(e)))
229 }
230
231 async fn get_table_schema(&self, table_name: &str) -> Result<SchemaRef> {
232 let table_ref = self
233 .table
234 .schema()
235 .as_ref()
236 .map(|s| TableReference::partial(*s, table_name))
237 .unwrap_or(TableReference::from(table_name));
238 self.pool()
239 .connect()
240 .await?
241 .get_schema(&table_ref)
242 .await
243 .map_err(|e| DataFusionError::External(Box::new(e)))
244 }
245 }
246}