clickhouse_datafusion/
sql.rs

1//! # `ClickHouse` SQL `DataFusion` `TableProvider`
2//!
3//! This module implements a SQL [`TableProvider`] for `DataFusion`.
4//!
5//! This is used as a fallback if the `datafusion-federation` optimizer is not enabled.
6use std::any::Any;
7use std::fmt;
8use std::fmt::{Display, Formatter};
9use std::sync::{Arc, LazyLock};
10
11use async_trait::async_trait;
12use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
13use datafusion::catalog::Session;
14use datafusion::datasource::TableProvider;
15use datafusion::error::Result;
16use datafusion::execution::TaskContext;
17use datafusion::logical_expr::logical_plan::builder::LogicalTableSource;
18use datafusion::logical_expr::{
19    Expr, LogicalPlan, LogicalPlanBuilder, TableProviderFilterPushDown, TableType,
20};
21use datafusion::physical_expr::EquivalenceProperties;
22use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
23use datafusion::physical_plan::{
24    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
25    SendableRecordBatchStream,
26};
27use datafusion::sql::TableReference;
28use datafusion::sql::unparser::Unparser;
29use datafusion::sql::unparser::dialect::{DefaultDialect, Dialect};
30
31use crate::connection::ClickHouseConnectionPool;
32use crate::dialect::ClickHouseDialect;
33use crate::stream::RecordBatchStream;
34
35/// Helper function to execute a SQL query and return a stream of record batches.
36pub fn execute_sql_query(
37    sql: impl Into<String>,
38    pool: Arc<ClickHouseConnectionPool>,
39    schema: SchemaRef,
40    coerce_schema: bool,
41) -> SendableRecordBatchStream {
42    let sql = sql.into();
43    tracing::debug!("Running sql: {sql}");
44    Box::pin(RecordBatchStream::new_from_query(sql, pool, schema, coerce_schema))
45}
46
47/// Controls whether join pushdown is allowed, and under what conditions
48///
49/// Mainly here to provider future interop with [`datafusion-table-providers`]
50#[derive(Clone, Debug, PartialEq, Eq, Hash)]
51pub enum JoinPushDown {
52    /// This connection pool should not allow join push down. (i.e. we don't know under what
53    /// conditions it is safe to send a join query to the database)
54    Disallow,
55    /// Allows join push down for other tables that share the same context.
56    ///
57    /// The context can be part of the connection string that uniquely identifies the server.
58    AllowedFor(String),
59}
60
61/// A [`TableProvider`] for remote `ClickHouse` sql tables.
62#[derive(Clone)]
63pub struct SqlTable {
64    pub table_reference:      TableReference,
65    pub(crate) coerce_schema: bool,
66    pool:                     Arc<ClickHouseConnectionPool>,
67    name:                     String,
68    schema:                   SchemaRef,
69    dialect:                  Option<Arc<ClickHouseDialect>>,
70    exprs:                    Option<Vec<Expr>>,
71}
72
73impl fmt::Debug for SqlTable {
74    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
75        f.debug_struct("SqlTable")
76            .field("name", &self.name)
77            .field("schema", &self.schema)
78            .field("table_reference", &self.table_reference)
79            .field("pool", &"ClickHouseConnectionPool")
80            .field("dialect", &self.dialect)
81            .field("exprs", &self.exprs)
82            .field("coerce_schema", &self.coerce_schema)
83            .finish()
84    }
85}
86
87impl SqlTable {
88    /// Create a new [`SqlTable`] from a name and a [`ClickHouseConnectionPool`].
89    ///
90    /// # Errors
91    /// - Returns an error if the table does not exist.
92    pub async fn try_new(
93        name: &str,
94        pool: Arc<ClickHouseConnectionPool>,
95        table_reference: impl Into<TableReference>,
96    ) -> Result<Self> {
97        let table_reference = table_reference.into();
98        let schema = pool.connect().await?.get_schema(&table_reference).await?;
99        Ok(Self::new_with_schema_unchecked(name, pool, schema, table_reference))
100    }
101
102    // TODO: Remove - docs
103    pub fn new_with_schema_unchecked(
104        name: &str,
105        pool: Arc<ClickHouseConnectionPool>,
106        schema: impl Into<SchemaRef>,
107        table_reference: impl Into<TableReference>,
108    ) -> Self {
109        Self {
110            name: name.to_owned(),
111            pool,
112            schema: schema.into(),
113            table_reference: table_reference.into(),
114            dialect: Some(Arc::new(ClickHouseDialect)),
115            exprs: None,
116            coerce_schema: false,
117        }
118    }
119
120    #[must_use]
121    pub fn with_exprs(mut self, exprs: Vec<Expr>) -> Self {
122        self.exprs = Some(exprs);
123        self
124    }
125
126    /// Whether to coerce the `RecordBatch`es returned by `SQLExecutor::execute` using the provided
127    /// `LogicalPlan`'s schema.
128    #[must_use]
129    pub fn with_coercion(mut self, coerce: bool) -> Self {
130        self.coerce_schema = coerce;
131        self
132    }
133
134    /// Access the underlying connection pool
135    pub fn pool(&self) -> &Arc<ClickHouseConnectionPool> { &self.pool }
136
137    // Return the current memory location of the object as a unique identifier
138    pub(crate) fn unique_id(&self) -> usize { std::ptr::from_ref(self) as usize }
139
140    fn dialect(&self) -> &(dyn Dialect + Send + Sync) {
141        match &self.dialect {
142            Some(dialect) => dialect.as_ref(),
143            None => &DefaultDialect {},
144        }
145    }
146
147    #[cfg(feature = "federation")]
148    fn arc_dialect(&self) -> Arc<dyn Dialect + Send + Sync> {
149        match &self.dialect {
150            Some(dialect) => Arc::clone(dialect) as Arc<dyn Dialect>,
151            None => Arc::new(DefaultDialect {}),
152        }
153    }
154}
155
156// Methods for generating SQL queries from a LogicalPlan and ExecutionPlans from SQL.
157impl SqlTable {
158    /// # Errors
159    /// - Returns an error if the logical plan creation fails
160    fn scan_to_sql(
161        &self,
162        projection: Option<&Vec<usize>>,
163        filters: &[Expr],
164        limit: Option<usize>,
165    ) -> Result<String> {
166        let logical_plan = self.create_logical_plan(projection, filters, limit)?;
167        let sql = Unparser::new(self.dialect()).plan_to_sql(&logical_plan)?.to_string();
168        Ok(sql)
169    }
170
171    fn create_logical_plan(
172        &self,
173        projection: Option<&Vec<usize>>,
174        filters: &[Expr],
175        limit: Option<usize>,
176    ) -> Result<LogicalPlan> {
177        // Ensure table reference does not include `catalog`
178        let table_ref = match self.table_reference.clone() {
179            TableReference::Full { schema, table, .. } => TableReference::partial(schema, table),
180            table_ref => table_ref,
181        };
182        let table_source = LogicalTableSource::new(self.schema());
183        let mut builder = LogicalPlanBuilder::scan_with_filters(
184            table_ref,
185            Arc::new(table_source),
186            projection.cloned(),
187            filters.to_vec(),
188        )?;
189        if let Some(exprs) = &self.exprs {
190            builder = builder.project(exprs.clone())?;
191        }
192        builder.limit(0, limit)?.build()
193    }
194
195    fn create_physical_plan(
196        &self,
197        projection: Option<&Vec<usize>>,
198        sql: String,
199    ) -> Result<Arc<dyn ExecutionPlan>> {
200        Ok(Arc::new(
201            ClickHouseSqlExec::try_new(projection, &self.schema(), Arc::clone(&self.pool), sql)?
202                .with_coercion(self.coerce_schema),
203        ))
204    }
205}
206
207#[async_trait]
208impl TableProvider for SqlTable {
209    fn as_any(&self) -> &dyn Any { self }
210
211    fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) }
212
213    fn table_type(&self) -> TableType { TableType::Base }
214
215    fn supports_filters_pushdown(
216        &self,
217        filters: &[&Expr],
218    ) -> Result<Vec<TableProviderFilterPushDown>> {
219        let filter_push_down: Vec<TableProviderFilterPushDown> = filters
220            .iter()
221            .map(|f| match Unparser::new(self.dialect()).expr_to_sql(f) {
222                Ok(_) => TableProviderFilterPushDown::Exact,
223                Err(_) => TableProviderFilterPushDown::Unsupported,
224            })
225            .collect();
226
227        Ok(filter_push_down)
228    }
229
230    async fn scan(
231        &self,
232        _state: &dyn Session,
233        projection: Option<&Vec<usize>>,
234        filters: &[Expr],
235        limit: Option<usize>,
236    ) -> Result<Arc<dyn ExecutionPlan>> {
237        let sql = self.scan_to_sql(projection, filters, limit)?;
238        return self.create_physical_plan(projection, sql);
239    }
240}
241
242impl Display for SqlTable {
243    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
244        write!(f, "ClickHouseSqlTable {}", self.name)
245    }
246}
247
248static ONE_COLUMN_SCHEMA: LazyLock<SchemaRef> =
249    LazyLock::new(|| Arc::new(Schema::new(vec![Field::new("1", DataType::Int64, true)])));
250
251/// Project a schema safely, taking into account empty columns
252///
253/// # Errors
254/// - Returns an error if the projection fails
255pub fn project_schema_safe(
256    schema: &SchemaRef,
257    projection: Option<&Vec<usize>>,
258) -> Result<SchemaRef> {
259    let schema = match projection {
260        Some(columns) => {
261            if columns.is_empty() {
262                // If the projection is Some([]) then it gets unparsed as `SELECT 1`, so return a
263                // schema with a single Int64 column.
264                //
265                // See: <https://github.com/apache/datafusion/blob/83ce79c39412a4f150167d00e40ea05948c4870f/datafusion/sql/src/unparser/plan.rs#L998>
266                ONE_COLUMN_SCHEMA.clone()
267            } else {
268                Arc::new(schema.project(columns)?)
269            }
270        }
271        None => Arc::clone(schema),
272    };
273    Ok(schema)
274}
275
276/// A custom [`ExecutionPlan`] for [`SqlTable`]s.
277#[derive(Clone)]
278pub struct ClickHouseSqlExec {
279    projected_schema: SchemaRef,
280    pool:             Arc<ClickHouseConnectionPool>,
281    sql:              String,
282    properties:       PlanProperties,
283    coerce_schema:    bool,
284}
285
286impl ClickHouseSqlExec {
287    /// Create a new [`ClickHouseSqlExec`] instance.
288    ///
289    /// # Errors
290    /// - Returns an error if the schema projection fails
291    pub fn try_new(
292        projection: Option<&Vec<usize>>,
293        schema: &SchemaRef,
294        pool: Arc<ClickHouseConnectionPool>,
295        sql: String,
296    ) -> Result<Self> {
297        let projected_schema = project_schema_safe(schema, projection)?;
298        Ok(Self {
299            projected_schema: Arc::clone(&projected_schema),
300            pool,
301            sql,
302            properties: PlanProperties::new(
303                EquivalenceProperties::new(projected_schema),
304                Partitioning::UnknownPartitioning(1),
305                EmissionType::Incremental,
306                Boundedness::Bounded,
307            ),
308            coerce_schema: false,
309        })
310    }
311
312    /// Whether the `RecordBatchStream` returned when calling `ExecutionPlan::execute` should
313    /// attempt schema coercion on each `RecordBatch`.
314    #[must_use]
315    pub fn with_coercion(mut self, coerce: bool) -> Self {
316        self.coerce_schema = coerce;
317        self
318    }
319
320    /// Returns the SQL query string used by this execution plan.
321    ///
322    /// # Errors
323    /// - Currently does not return an error. Kept here to allow for validation in the future.
324    pub fn sql(&self) -> Result<String> { Ok(self.sql.clone()) }
325}
326
327impl fmt::Debug for ClickHouseSqlExec {
328    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
329        let sql = self.sql().unwrap_or_default();
330        write!(f, "ClickHouseSqlExec sql={sql}")
331    }
332}
333
334impl DisplayAs for ClickHouseSqlExec {
335    fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter<'_>) -> fmt::Result {
336        let sql = self.sql().unwrap_or_default();
337        write!(f, "ClickHouseSqlExec sql={sql}")
338    }
339}
340
341impl ExecutionPlan for ClickHouseSqlExec {
342    fn name(&self) -> &'static str { "SqlExec" }
343
344    fn as_any(&self) -> &dyn Any { self }
345
346    fn schema(&self) -> SchemaRef { Arc::clone(&self.projected_schema) }
347
348    fn properties(&self) -> &PlanProperties { &self.properties }
349
350    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { vec![] }
351
352    fn with_new_children(
353        self: Arc<Self>,
354        _children: Vec<Arc<dyn ExecutionPlan>>,
355    ) -> Result<Arc<dyn ExecutionPlan>> {
356        Ok(self)
357    }
358
359    fn execute(
360        &self,
361        _partition: usize,
362        _context: Arc<TaskContext>,
363    ) -> Result<SendableRecordBatchStream> {
364        let sql = self.sql()?;
365        Ok(execute_sql_query(sql, Arc::clone(&self.pool), self.schema(), self.coerce_schema))
366    }
367}
368
369#[cfg(feature = "federation")]
370pub mod federation {
371
372    use std::sync::Arc;
373
374    use async_trait::async_trait;
375    use datafusion::arrow::datatypes::SchemaRef;
376    use datafusion::catalog::TableProvider;
377    use datafusion::common::exec_err;
378    use datafusion::error::Result;
379    use datafusion::physical_plan::SendableRecordBatchStream;
380    use datafusion::sql::TableReference;
381    use datafusion::sql::unparser::dialect::Dialect;
382    use datafusion_federation::sql::{
383        RemoteTableRef, SQLExecutor, SQLFederationProvider, SQLTableSource,
384    };
385    use datafusion_federation::{FederatedTableProviderAdaptor, FederatedTableSource};
386
387    use super::{JoinPushDown, SqlTable, execute_sql_query};
388
389    impl SqlTable {
390        /// Create a federated table source for this table provider.
391        fn create_federated_table_source(self: Arc<Self>) -> Arc<dyn FederatedTableSource> {
392            let table_ref = RemoteTableRef::from(self.table_reference.clone());
393            let schema = self.schema();
394            let fed_provider = Arc::new(SQLFederationProvider::new(self));
395            Arc::new(SQLTableSource::new_with_schema(fed_provider, table_ref, schema))
396        }
397
398        /// Create a federated table provider wrapping this table provider.
399        ///
400        /// # Errors
401        /// - Error if `create_federated_table_source` fails.
402        pub fn create_federated_table_provider(
403            self: Arc<Self>,
404        ) -> Result<FederatedTableProviderAdaptor> {
405            let table_source = Self::create_federated_table_source(Arc::clone(&self));
406            Ok(FederatedTableProviderAdaptor::new_with_provider(table_source, self))
407        }
408    }
409
410    #[async_trait]
411    impl SQLExecutor for SqlTable {
412        fn name(&self) -> &str { &self.name }
413
414        fn compute_context(&self) -> Option<String> {
415            match self.pool.join_push_down() {
416                JoinPushDown::AllowedFor(context) => Some(context),
417                // Don't return None here - it will cause incorrect federation with other providers
418                // of the same name that also have a compute_context of None.
419                // Instead return a random string that will never match any other
420                // provider's context.
421                JoinPushDown::Disallow => Some(format!("{}", self.unique_id())),
422            }
423        }
424
425        fn dialect(&self) -> Arc<dyn Dialect> { self.arc_dialect() }
426
427        fn execute(&self, sql: &str, schema: SchemaRef) -> Result<SendableRecordBatchStream> {
428            let sql = sql.to_string();
429            let pool = Arc::clone(&self.pool);
430            let coerce_schema = self.coerce_schema;
431            Ok(execute_sql_query(sql, pool, schema, coerce_schema))
432        }
433
434        async fn table_names(&self) -> Result<Vec<String>> {
435            let Some(schema) = self.table_reference.schema() else {
436                return exec_err!("Cannot fetch tables without a schema");
437            };
438            self.pool.connect().await?.tables(schema).await
439        }
440
441        async fn get_table_schema(&self, table_name: &str) -> Result<SchemaRef> {
442            let table_ref = self
443                .table_reference
444                .schema()
445                .as_ref()
446                .map(|s| TableReference::partial(*s, table_name))
447                .unwrap_or(TableReference::from(table_name));
448            self.pool.connect().await?.get_schema(&table_ref).await
449        }
450    }
451}