clickhouse_datafusion/
sql.rs

1//! # `ClickHouse` SQL `DataFusion` `TableProvider`
2//!
3//! This module implements a SQL [`TableProvider`] for `DataFusion`.
4//!
5//! This is used as a fallback if the `datafusion-federation` optimizer is not enabled.
6use std::any::Any;
7use std::fmt;
8use std::fmt::{Display, Formatter};
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use datafusion::arrow::datatypes::{Schema, SchemaRef};
13use datafusion::catalog::Session;
14use datafusion::datasource::TableProvider;
15use datafusion::error::Result;
16use datafusion::execution::TaskContext;
17use datafusion::logical_expr::logical_plan::builder::LogicalTableSource;
18use datafusion::logical_expr::{
19    Expr, LogicalPlan, LogicalPlanBuilder, TableProviderFilterPushDown, TableType,
20};
21use datafusion::physical_expr::EquivalenceProperties;
22use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
23use datafusion::physical_plan::{
24    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
25    SendableRecordBatchStream,
26};
27use datafusion::sql::TableReference;
28use datafusion::sql::unparser::Unparser;
29use datafusion::sql::unparser::dialect::{DefaultDialect, Dialect};
30
31use crate::connection::ClickHouseConnectionPool;
32use crate::dialect::ClickHouseDialect;
33use crate::stream::RecordBatchStream;
34
35/// Helper function to execute a SQL query and return a stream of record batches.
36pub fn execute_sql_query(
37    sql: impl Into<String>,
38    pool: Arc<ClickHouseConnectionPool>,
39    schema: SchemaRef,
40    coerce_schema: bool,
41) -> SendableRecordBatchStream {
42    let sql = sql.into();
43    tracing::debug!("Running sql: {sql}");
44    Box::pin(RecordBatchStream::new_from_query(sql, pool, schema, coerce_schema))
45}
46
47/// Controls whether join pushdown is allowed, and under what conditions
48///
49/// Mainly here to provider future interop with [`datafusion-table-providers`]
50#[derive(Clone, Debug, PartialEq, Eq, Hash)]
51pub enum JoinPushDown {
52    /// This connection pool should not allow join push down. (i.e. we don't know under what
53    /// conditions it is safe to send a join query to the database)
54    Disallow,
55    /// Allows join push down for other tables that share the same context.
56    ///
57    /// The context can be part of the connection string that uniquely identifies the server.
58    AllowedFor(String),
59}
60
61/// A [`TableProvider`] for remote `ClickHouse` sql tables.
62#[derive(Clone)]
63pub struct SqlTable {
64    pub table_reference:      TableReference,
65    pub(crate) coerce_schema: bool,
66    pool:                     Arc<ClickHouseConnectionPool>,
67    name:                     String,
68    schema:                   SchemaRef,
69    dialect:                  Option<Arc<ClickHouseDialect>>,
70    exprs:                    Option<Vec<Expr>>,
71}
72
73impl fmt::Debug for SqlTable {
74    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
75        f.debug_struct("SqlTable")
76            .field("name", &self.name)
77            .field("schema", &self.schema)
78            .field("table_reference", &self.table_reference)
79            .field("pool", &"ClickHouseConnectionPool")
80            .field("dialect", &self.dialect)
81            .field("exprs", &self.exprs)
82            .field("coerce_schema", &self.coerce_schema)
83            .finish()
84    }
85}
86
87impl SqlTable {
88    /// Create a new [`SqlTable`] from a name and a [`ClickHouseConnectionPool`].
89    ///
90    /// # Errors
91    /// - Returns an error if the table does not exist.
92    pub async fn try_new(
93        name: &str,
94        pool: Arc<ClickHouseConnectionPool>,
95        table_reference: impl Into<TableReference>,
96    ) -> Result<Self> {
97        let table_reference = table_reference.into();
98        let schema = pool.connect().await?.get_schema(&table_reference).await?;
99        Ok(Self::new_with_schema_unchecked(name, pool, schema, table_reference))
100    }
101
102    // TODO: Remove - docs
103    pub fn new_with_schema_unchecked(
104        name: &str,
105        pool: Arc<ClickHouseConnectionPool>,
106        schema: impl Into<SchemaRef>,
107        table_reference: impl Into<TableReference>,
108    ) -> Self {
109        Self {
110            name: name.to_owned(),
111            pool,
112            schema: schema.into(),
113            table_reference: table_reference.into(),
114            dialect: Some(Arc::new(ClickHouseDialect)),
115            exprs: None,
116            coerce_schema: false,
117        }
118    }
119
120    #[must_use]
121    pub fn with_exprs(mut self, exprs: Vec<Expr>) -> Self {
122        self.exprs = Some(exprs);
123        self
124    }
125
126    /// Whether to coerce the `RecordBatch`es returned by `SQLExecutor::execute` using the provided
127    /// `LogicalPlan`'s schema.
128    #[must_use]
129    pub fn with_coercion(mut self, coerce: bool) -> Self {
130        self.coerce_schema = coerce;
131        self
132    }
133
134    /// Access the underlying connection pool
135    pub fn pool(&self) -> &Arc<ClickHouseConnectionPool> { &self.pool }
136
137    // Return the current memory location of the object as a unique identifier
138    pub(crate) fn unique_id(&self) -> usize { std::ptr::from_ref(self) as usize }
139
140    fn dialect(&self) -> &(dyn Dialect + Send + Sync) {
141        match &self.dialect {
142            Some(dialect) => dialect.as_ref(),
143            None => &DefaultDialect {},
144        }
145    }
146
147    #[cfg(feature = "federation")]
148    fn arc_dialect(&self) -> Arc<dyn Dialect + Send + Sync> {
149        match &self.dialect {
150            Some(dialect) => Arc::clone(dialect) as Arc<dyn Dialect>,
151            None => Arc::new(DefaultDialect {}),
152        }
153    }
154}
155
156// Methods for generating SQL queries from a LogicalPlan and ExecutionPlans from SQL.
157impl SqlTable {
158    /// # Errors
159    /// - Returns an error if the logical plan creation fails
160    fn scan_to_sql(
161        &self,
162        projection: Option<&Vec<usize>>,
163        filters: &[Expr],
164        limit: Option<usize>,
165    ) -> Result<String> {
166        let logical_plan = self.create_logical_plan(projection, filters, limit)?;
167        let sql = Unparser::new(self.dialect()).plan_to_sql(&logical_plan)?.to_string();
168        Ok(sql)
169    }
170
171    fn create_logical_plan(
172        &self,
173        projection: Option<&Vec<usize>>,
174        filters: &[Expr],
175        limit: Option<usize>,
176    ) -> Result<LogicalPlan> {
177        // Ensure table reference does not include `catalog`
178        let table_ref = match self.table_reference.clone() {
179            TableReference::Full { schema, table, .. } => TableReference::partial(schema, table),
180            table_ref => table_ref,
181        };
182        let table_source = LogicalTableSource::new(self.schema());
183        let mut builder = LogicalPlanBuilder::scan_with_filters(
184            table_ref,
185            Arc::new(table_source),
186            projection.cloned(),
187            filters.to_vec(),
188        )?;
189        if let Some(exprs) = &self.exprs {
190            builder = builder.project(exprs.clone())?;
191        }
192        builder.limit(0, limit)?.build()
193    }
194
195    fn create_physical_plan(
196        &self,
197        projection: Option<&Vec<usize>>,
198        sql: String,
199    ) -> Result<Arc<dyn ExecutionPlan>> {
200        Ok(Arc::new(
201            ClickHouseSqlExec::try_new(projection, &self.schema(), Arc::clone(&self.pool), sql)?
202                .with_coercion(self.coerce_schema),
203        ))
204    }
205}
206
207#[async_trait]
208impl TableProvider for SqlTable {
209    fn as_any(&self) -> &dyn Any { self }
210
211    fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) }
212
213    fn table_type(&self) -> TableType { TableType::Base }
214
215    fn supports_filters_pushdown(
216        &self,
217        filters: &[&Expr],
218    ) -> Result<Vec<TableProviderFilterPushDown>> {
219        let filter_push_down: Vec<TableProviderFilterPushDown> = filters
220            .iter()
221            .map(|f| match Unparser::new(self.dialect()).expr_to_sql(f) {
222                Ok(_) => TableProviderFilterPushDown::Exact,
223                Err(_) => TableProviderFilterPushDown::Unsupported,
224            })
225            .collect();
226
227        Ok(filter_push_down)
228    }
229
230    async fn scan(
231        &self,
232        _state: &dyn Session,
233        projection: Option<&Vec<usize>>,
234        filters: &[Expr],
235        limit: Option<usize>,
236    ) -> Result<Arc<dyn ExecutionPlan>> {
237        let sql = self.scan_to_sql(projection, filters, limit)?;
238        return self.create_physical_plan(projection, sql);
239    }
240}
241
242impl Display for SqlTable {
243    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
244        write!(f, "ClickHouseSqlTable {}", self.name)
245    }
246}
247
248/// Project a schema safely, taking into account empty columns.
249///
250/// When `DataFusion` executes queries like `COUNT(*)`, it passes an empty projection
251/// (`Some([])`) indicating no columns are needed. This function returns an empty
252/// schema in that case, as `DataFusion` only needs row counts, not column values.
253///
254/// The underlying SQL still generates `SELECT 1 FROM table` for `ClickHouse`, but
255/// the `RecordBatchStream` schema correctly reflects that no column data is accessed.
256///
257/// # Errors
258/// - Returns an error if the schema projection fails
259pub fn project_schema_safe(
260    schema: &SchemaRef,
261    projection: Option<&Vec<usize>>,
262) -> Result<SchemaRef> {
263    let schema = match projection {
264        Some(columns) => {
265            if columns.is_empty() {
266                // Empty projection = no columns needed (e.g., COUNT(*))
267                // Return empty schema to match DataFusion's logical plan expectations
268                Arc::new(Schema::empty())
269            } else {
270                Arc::new(schema.project(columns)?)
271            }
272        }
273        None => Arc::clone(schema),
274    };
275    Ok(schema)
276}
277
278/// A custom [`ExecutionPlan`] for [`SqlTable`]s.
279#[derive(Clone)]
280pub struct ClickHouseSqlExec {
281    projected_schema: SchemaRef,
282    pool:             Arc<ClickHouseConnectionPool>,
283    sql:              String,
284    properties:       PlanProperties,
285    coerce_schema:    bool,
286}
287
288impl ClickHouseSqlExec {
289    /// Create a new [`ClickHouseSqlExec`] instance.
290    ///
291    /// # Errors
292    /// - Returns an error if the schema projection fails
293    pub fn try_new(
294        projection: Option<&Vec<usize>>,
295        schema: &SchemaRef,
296        pool: Arc<ClickHouseConnectionPool>,
297        sql: String,
298    ) -> Result<Self> {
299        let projected_schema = project_schema_safe(schema, projection)?;
300        Ok(Self {
301            projected_schema: Arc::clone(&projected_schema),
302            pool,
303            sql,
304            properties: PlanProperties::new(
305                EquivalenceProperties::new(projected_schema),
306                Partitioning::UnknownPartitioning(1),
307                EmissionType::Incremental,
308                Boundedness::Bounded,
309            ),
310            coerce_schema: false,
311        })
312    }
313
314    /// Whether the `RecordBatchStream` returned when calling `ExecutionPlan::execute` should
315    /// attempt schema coercion on each `RecordBatch`.
316    #[must_use]
317    pub fn with_coercion(mut self, coerce: bool) -> Self {
318        self.coerce_schema = coerce;
319        self
320    }
321
322    /// Returns the SQL query string used by this execution plan.
323    ///
324    /// # Errors
325    /// - Currently does not return an error. Kept here to allow for validation in the future.
326    pub fn sql(&self) -> Result<String> { Ok(self.sql.clone()) }
327}
328
329impl fmt::Debug for ClickHouseSqlExec {
330    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
331        let sql = self.sql().unwrap_or_default();
332        write!(f, "ClickHouseSqlExec sql={sql}")
333    }
334}
335
336impl DisplayAs for ClickHouseSqlExec {
337    fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter<'_>) -> fmt::Result {
338        let sql = self.sql().unwrap_or_default();
339        write!(f, "ClickHouseSqlExec sql={sql}")
340    }
341}
342
343impl ExecutionPlan for ClickHouseSqlExec {
344    fn name(&self) -> &'static str { "SqlExec" }
345
346    fn as_any(&self) -> &dyn Any { self }
347
348    fn schema(&self) -> SchemaRef { Arc::clone(&self.projected_schema) }
349
350    fn properties(&self) -> &PlanProperties { &self.properties }
351
352    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { vec![] }
353
354    fn with_new_children(
355        self: Arc<Self>,
356        _children: Vec<Arc<dyn ExecutionPlan>>,
357    ) -> Result<Arc<dyn ExecutionPlan>> {
358        Ok(self)
359    }
360
361    fn execute(
362        &self,
363        _partition: usize,
364        _context: Arc<TaskContext>,
365    ) -> Result<SendableRecordBatchStream> {
366        let sql = self.sql()?;
367        Ok(execute_sql_query(sql, Arc::clone(&self.pool), self.schema(), self.coerce_schema))
368    }
369}
370
371#[cfg(feature = "federation")]
372pub mod federation {
373
374    use std::sync::Arc;
375
376    use async_trait::async_trait;
377    use datafusion::arrow::datatypes::SchemaRef;
378    use datafusion::catalog::TableProvider;
379    use datafusion::common::exec_err;
380    use datafusion::error::Result;
381    use datafusion::physical_plan::SendableRecordBatchStream;
382    use datafusion::sql::TableReference;
383    use datafusion::sql::unparser::dialect::Dialect;
384    use datafusion_federation::sql::{
385        RemoteTableRef, SQLExecutor, SQLFederationProvider, SQLTableSource,
386    };
387    use datafusion_federation::{FederatedTableProviderAdaptor, FederatedTableSource};
388
389    use super::{JoinPushDown, SqlTable, execute_sql_query};
390
391    impl SqlTable {
392        /// Create a federated table source for this table provider.
393        fn create_federated_table_source(self: Arc<Self>) -> Arc<dyn FederatedTableSource> {
394            let table_ref = RemoteTableRef::from(self.table_reference.clone());
395            let schema = self.schema();
396            let fed_provider = Arc::new(SQLFederationProvider::new(self));
397            Arc::new(SQLTableSource::new_with_schema(fed_provider, table_ref, schema))
398        }
399
400        /// Create a federated table provider wrapping this table provider.
401        ///
402        /// # Errors
403        /// - Error if `create_federated_table_source` fails.
404        pub fn create_federated_table_provider(
405            self: Arc<Self>,
406        ) -> Result<FederatedTableProviderAdaptor> {
407            let table_source = Self::create_federated_table_source(Arc::clone(&self));
408            Ok(FederatedTableProviderAdaptor::new_with_provider(table_source, self))
409        }
410    }
411
412    #[async_trait]
413    impl SQLExecutor for SqlTable {
414        fn name(&self) -> &str { &self.name }
415
416        fn compute_context(&self) -> Option<String> {
417            match self.pool.join_push_down() {
418                JoinPushDown::AllowedFor(context) => Some(context),
419                // Don't return None here - it will cause incorrect federation with other providers
420                // of the same name that also have a compute_context of None.
421                // Instead return a random string that will never match any other
422                // provider's context.
423                JoinPushDown::Disallow => Some(format!("{}", self.unique_id())),
424            }
425        }
426
427        fn dialect(&self) -> Arc<dyn Dialect> { self.arc_dialect() }
428
429        fn execute(&self, sql: &str, schema: SchemaRef) -> Result<SendableRecordBatchStream> {
430            let sql = sql.to_string();
431            let pool = Arc::clone(&self.pool);
432            let coerce_schema = self.coerce_schema;
433            Ok(execute_sql_query(sql, pool, schema, coerce_schema))
434        }
435
436        async fn table_names(&self) -> Result<Vec<String>> {
437            let Some(schema) = self.table_reference.schema() else {
438                return exec_err!("Cannot fetch tables without a schema");
439            };
440            self.pool.connect().await?.tables(schema).await
441        }
442
443        async fn get_table_schema(&self, table_name: &str) -> Result<SchemaRef> {
444            let table_ref = self
445                .table_reference
446                .schema()
447                .as_ref()
448                .map(|s| TableReference::partial(*s, table_name))
449                .unwrap_or(TableReference::from(table_name));
450            self.pool.connect().await?.get_schema(&table_ref).await
451        }
452    }
453}