clickhouse_datafusion/providers/
table.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use datafusion::arrow::datatypes::SchemaRef;
5use datafusion::catalog::Session;
6use datafusion::datasource::TableProvider;
7use datafusion::datasource::sink::DataSinkExec;
8use datafusion::error::{DataFusionError, Result};
9use datafusion::execution::SendableRecordBatchStream;
10use datafusion::logical_expr::dml::InsertOp;
11use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
12use datafusion::physical_plan::ExecutionPlan;
13use datafusion::sql::TableReference;
14
15use crate::connection::ClickHouseConnectionPool;
16use crate::sink::ClickHouseDataSink;
17use crate::sql::{JoinPushDown, SqlTable, execute_sql_query};
18
19pub const CLICKHOUSE_TABLE_PROVIDER_NAME: &str = "ClickHouseTableProvider";
20
21// TODO: Docs
22//
23/// A custom [`TableProvider`] for remove `ClickHouse` tables.
24#[derive(Debug)]
25pub struct ClickHouseTableProvider {
26    pub(crate) table:  TableReference,
27    pub(crate) reader: SqlTable,
28}
29
30impl ClickHouseTableProvider {
31    /// Creates a new `TableProvider`, fetching the schema from `ClickHouse` if not provided.
32    ///
33    /// # Errors
34    /// - Returns an error if the `SQLTable` creation fails.
35    pub async fn try_new(
36        pool: Arc<ClickHouseConnectionPool>,
37        table: TableReference,
38    ) -> Result<Self> {
39        let inner = SqlTable::try_new(CLICKHOUSE_TABLE_PROVIDER_NAME, pool, table.clone()).await?;
40        Ok(Self { reader: inner, table })
41    }
42
43    /// Creates a new `TableProvider` with a pre-fetched schema. The underlying `SqlTable` will not
44    /// verify if the table exists nor if the schema is accurate. To catch any errors in these
45    /// cases, use [`try_new`](Self::try_new).
46    pub fn new_with_schema_unchecked(
47        pool: Arc<ClickHouseConnectionPool>,
48        table: TableReference,
49        schema: SchemaRef,
50    ) -> Self {
51        let inner = SqlTable::new_with_schema_unchecked(
52            CLICKHOUSE_TABLE_PROVIDER_NAME,
53            pool,
54            schema,
55            table.clone(),
56        );
57        Self { reader: inner, table }
58    }
59
60    #[must_use]
61    pub fn with_exprs(mut self, exprs: Vec<Expr>) -> Self {
62        self.reader = self.reader.with_exprs(exprs);
63        self
64    }
65
66    /// Whether to coerce the `RecordBatch`es returned by `SQLExecutor::execute` using the generated
67    /// `LogicalPlan`'s schema.
68    #[must_use]
69    pub fn with_coercion(mut self, coerce: bool) -> Self {
70        self.reader = self.reader.with_coercion(coerce);
71        self
72    }
73
74    // TODO: Remove - Docs
75    pub fn reader(&self) -> &SqlTable { &self.reader }
76
77    // TODO: Remove - Docs
78    pub fn pool(&self) -> &Arc<ClickHouseConnectionPool> { self.reader.pool() }
79
80    // TODO: Remove - Docs
81    pub fn coerce_schema(&self) -> bool { self.reader.coerce_schema }
82
83    /// Executes a SQL query and returns a stream of record batches.
84    ///
85    /// This method is exposed so that federation is not required to run sql using this table
86    /// provider.
87    ///
88    /// # Errors
89    /// - Returns an error if the connection pool fails to connect.
90    /// - Returns an error if the query execution fails.
91    pub fn execute_sql(&self, query: &str, schema: SchemaRef) -> Result<SendableRecordBatchStream> {
92        let coerce_schema = self.reader.coerce_schema;
93        let pool = Arc::clone(self.pool());
94        let query = query.to_string();
95        Ok(execute_sql_query(query, pool, schema, coerce_schema))
96    }
97
98    /// Provide the unique context for this table provider.
99    ///
100    /// This method is provided publicly to allow access to whether this provider accesses the same
101    /// underlying `ClickHouse` server as another, without relying on federation.
102    pub fn unique_context(&self) -> String {
103        match self.pool().join_push_down() {
104            JoinPushDown::AllowedFor(context) => context,
105            // Don't return None here - it will cause incorrect federation with other providers of
106            // the same name that also have a compute_context of None. Instead return a
107            // random string that will never match any other provider's context.
108            JoinPushDown::Disallow => format!("{}", self.reader.unique_id()),
109        }
110    }
111}
112
113#[async_trait]
114impl TableProvider for ClickHouseTableProvider {
115    fn as_any(&self) -> &dyn std::any::Any { self }
116
117    fn schema(&self) -> SchemaRef { self.reader.schema() }
118
119    fn table_type(&self) -> datafusion::datasource::TableType { self.reader.table_type() }
120
121    fn supports_filters_pushdown(
122        &self,
123        filters: &[&Expr],
124    ) -> Result<Vec<TableProviderFilterPushDown>> {
125        self.reader.supports_filters_pushdown(filters)
126    }
127
128    async fn scan(
129        &self,
130        state: &dyn Session,
131        projection: Option<&Vec<usize>>,
132        filters: &[Expr],
133        limit: Option<usize>,
134    ) -> Result<Arc<dyn ExecutionPlan>> {
135        self.reader.scan(state, projection, filters, limit).await
136    }
137
138    async fn insert_into(
139        &self,
140        _state: &dyn Session,
141        input: Arc<dyn ExecutionPlan>,
142        insert_op: InsertOp,
143    ) -> Result<Arc<dyn ExecutionPlan>> {
144        // ClickHouse doesn't support OVERWRITE natively in the same way as some systems,
145        // so we'll treat it as an error for now until something better is implemented.
146        //
147        // TODO: Impelement `overwrite` but truncating
148        if matches!(insert_op, InsertOp::Overwrite) {
149            return Err(DataFusionError::NotImplemented(
150                "OVERWRITE operation not supported for ClickHouse".to_string(),
151            ));
152        }
153
154        Ok(Arc::new(DataSinkExec::new(
155            input,
156            Arc::new(ClickHouseDataSink::new(
157                Arc::clone(self.reader.pool()),
158                self.table.clone(),
159                self.reader.schema(),
160            )),
161            None,
162        )))
163    }
164}
165
166#[cfg(feature = "federation")]
167mod federation {
168    use std::sync::Arc;
169
170    use async_trait::async_trait;
171    use datafusion::arrow::datatypes::SchemaRef;
172    use datafusion::catalog::TableProvider;
173    use datafusion::error::{DataFusionError, Result};
174    use datafusion::execution::SendableRecordBatchStream;
175    use datafusion::sql::TableReference;
176    use datafusion_federation::sql::{
177        RemoteTableRef, SQLExecutor, SQLFederationProvider, SQLTableSource,
178    };
179    use datafusion_federation::{FederatedTableProviderAdaptor, FederatedTableSource};
180    use tracing::debug;
181
182    use super::ClickHouseTableProvider;
183    use crate::dialect::ClickHouseDialect;
184
185    impl ClickHouseTableProvider {
186        /// Create a federated table source for this table provider.
187        pub fn create_federated_table_source(self: Arc<Self>) -> Arc<dyn FederatedTableSource> {
188            let table_name: RemoteTableRef = self.table.clone().into();
189            let schema = self.schema();
190            debug!(table = %table_name.table_ref(), "Creating federated table source");
191            let fed_provider = Arc::new(SQLFederationProvider::new(self));
192            Arc::new(SQLTableSource::new_with_schema(fed_provider, table_name, schema))
193        }
194
195        /// Create a federated table provider wrapping this table provider.
196        pub fn create_federated_table_provider(self: Arc<Self>) -> FederatedTableProviderAdaptor {
197            let table_source = Self::create_federated_table_source(Arc::clone(&self));
198            FederatedTableProviderAdaptor::new_with_provider(table_source, self)
199        }
200    }
201
202    #[async_trait]
203    impl SQLExecutor for ClickHouseTableProvider {
204        fn name(&self) -> &'static str { "clickhouse" }
205
206        fn compute_context(&self) -> Option<String> { self.reader.compute_context() }
207
208        fn dialect(&self) -> Arc<dyn datafusion::sql::unparser::dialect::Dialect> {
209            Arc::new(ClickHouseDialect)
210        }
211
212        fn ast_analyzer(&self) -> Option<datafusion_federation::sql::AstAnalyzer> {
213            // No custom AST rewriting needed for now; arrayJoin and other functions handled by
214            // dialect
215            None
216        }
217
218        fn execute(&self, query: &str, schema: SchemaRef) -> Result<SendableRecordBatchStream> {
219            self.reader.execute(query, schema)
220        }
221
222        async fn table_names(&self) -> Result<Vec<String>> {
223            self.pool()
224                .connect()
225                .await?
226                .tables(self.table.schema().expect("Schema must be present"))
227                .await
228                .map_err(|e| DataFusionError::External(Box::new(e)))
229        }
230
231        async fn get_table_schema(&self, table_name: &str) -> Result<SchemaRef> {
232            let table_ref = self
233                .table
234                .schema()
235                .as_ref()
236                .map(|s| TableReference::partial(*s, table_name))
237                .unwrap_or(TableReference::from(table_name));
238            self.pool()
239                .connect()
240                .await?
241                .get_schema(&table_ref)
242                .await
243                .map_err(|e| DataFusionError::External(Box::new(e)))
244        }
245    }
246}