clickhouse_datafusion/providers/
table_factory.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use clickhouse_arrow::{ArrowConnectionPoolBuilder, Destination};
6use datafusion::arrow::datatypes::SchemaRef;
7use datafusion::catalog::{Session, TableProvider, TableProviderFactory};
8use datafusion::common::exec_err;
9use datafusion::error::Result;
10use datafusion::logical_expr::CreateExternalTable;
11use datafusion::sql::TableReference;
12use parking_lot::Mutex;
13use tracing::{debug, warn};
14
15use crate::connection::ClickHouseConnectionPool;
16use crate::providers::table::ClickHouseTableProvider;
17
18// TODO: Docs - especially explain the different ways to use this, the fact that it exists since it
19// wraps a `ClickHouseConnectionPool`, and how to use it with a `ClickHouseConnectionPoolBuilder`.
20//
21/// A table factory for creating `ClickHouse` [`TableProvider`]s.
22///
23/// Returns a federated table provider if the `federation` feature is enabled, otherwise a
24/// [`TableProvider`]
25#[derive(Debug, Clone)]
26pub struct ClickHouseTableFactory {
27    pool:          Arc<ClickHouseConnectionPool>,
28    coerce_schema: bool,
29}
30
31impl ClickHouseTableFactory {
32    pub fn new(pool: Arc<ClickHouseConnectionPool>) -> Self { Self { pool, coerce_schema: false } }
33
34    /// Get a reference to the connection pool.
35    pub fn pool(&self) -> &Arc<ClickHouseConnectionPool> { &self.pool }
36
37    /// Set whether to coerce the schema of the table provider.
38    #[must_use]
39    pub fn with_coercion(mut self, coerce_schema: bool) -> Self {
40        self.coerce_schema = coerce_schema;
41        self
42    }
43
44    /// # Errors
45    /// - Returns an error if the table provider cannot be created.
46    pub async fn table_provider(
47        &self,
48        table_reference: TableReference,
49    ) -> Result<Arc<dyn TableProvider + 'static>> {
50        let pool = Arc::clone(&self.pool);
51        debug!(%table_reference, "Creating ClickHouse table provider");
52
53        let provider = Arc::new(
54            ClickHouseTableProvider::try_new(pool, table_reference)
55                .await
56                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
57                .with_coercion(self.coerce_schema),
58        );
59
60        #[cfg(feature = "federation")]
61        let provider =
62            Arc::new(provider.create_federated_table_provider()) as Arc<dyn TableProvider>;
63
64        Ok(provider)
65    }
66
67    /// Create a table provider from a schema.
68    ///
69    /// # Errors
70    /// - Returns an error only in the federation feature path, if federating fails.
71    pub fn table_provider_from_schema(
72        &self,
73        table_reference: TableReference,
74        schema: SchemaRef,
75    ) -> Arc<dyn TableProvider + 'static> {
76        debug!(%table_reference, "Creating ClickHouse table provider from schema");
77        let provider = Arc::new(
78            ClickHouseTableProvider::new_with_schema_unchecked(
79                Arc::clone(&self.pool),
80                table_reference,
81                schema,
82            )
83            .with_coercion(self.coerce_schema),
84        );
85        #[cfg(feature = "federation")]
86        let provider = Arc::new(provider.create_federated_table_provider());
87        provider
88    }
89}
90
91/// A `DataFusion` [`TableProviderFactory`] for creating `ClickHouse` tables.
92#[derive(Debug, Clone)]
93pub struct ClickHouseTableProviderFactory {
94    pools:          Arc<Mutex<HashMap<Destination, ClickHouseConnectionPool>>>,
95    coerce_schemas: bool,
96}
97
98impl ClickHouseTableProviderFactory {
99    pub fn new() -> Self {
100        Self { pools: Arc::new(Mutex::new(HashMap::default())), coerce_schemas: false }
101    }
102
103    /// Set whether [`ClickHouseTableProvider`]s should attempt to coerce `RecordBatch` schemas to
104    /// the schema of the `LogicalPlan` being executed.
105    #[must_use]
106    pub fn with_coercion(mut self, coerce: bool) -> Self {
107        self.coerce_schemas = coerce;
108        self
109    }
110
111    pub fn coerce_schemas(&self) -> bool { self.coerce_schemas }
112}
113
114impl ClickHouseTableProviderFactory {
115    // TODO: Docs
116    /// # Errors
117    /// - Returns an error if creating the `ClickHouseConnectionPool` fails.
118    pub async fn new_with_builder(
119        endpoint: impl Into<Destination>,
120        builder: ArrowConnectionPoolBuilder,
121    ) -> Result<Self> {
122        let this = Self::new();
123        drop(this.attach_pool_builder(endpoint, builder).await?);
124        Ok(this)
125    }
126
127    // TODO: Docs
128    /// Attach an existing [`ClickHouseConnectionPool`] to the factory by providing
129    /// [`ArrowConnectionPoolBuilder`] which will be built into a [`ClickHouseConnectionPool`]
130    ///
131    /// # Errors
132    /// - Returns an error if creating the `ClickHouseConnectionPool` fails.
133    pub async fn attach_pool_builder(
134        &self,
135        endpoint: impl Into<Destination>,
136        builder: ArrowConnectionPoolBuilder,
137    ) -> Result<ClickHouseConnectionPool> {
138        let endpoint = endpoint.into();
139        debug!(?endpoint, "Attaching ClickHouse connection pool");
140
141        // Since this pool will be used for ddl, it's essential it connects to the "default" db
142        let builder = builder.configure_client(|c| c.with_database("default"));
143
144        // Create connection pool
145        let pool = ClickHouseConnectionPool::from_pool_builder(builder).await?;
146        debug!(?endpoint, "Connection pool created successfully");
147
148        // Update map
149        drop(self.pools.lock().insert(endpoint, pool.clone()));
150        Ok(pool)
151    }
152
153    // TODO: Docs - explain how serialized params are used by `TableProviderFactory` below and how
154    // this method enables it
155    //
156    /// Get or create a connection pool from parameters, returning an existing connection pool if
157    /// the endpoint is already connected
158    async fn get_or_create_pool_from_params(
159        &self,
160        endpoint: &str,
161        params: &mut HashMap<String, String>,
162    ) -> Result<ClickHouseConnectionPool> {
163        if endpoint.is_empty() {
164            tracing::error!("Endpoint is required for ClickHouse, received empty value");
165            return exec_err!("Endpoint is required for ClickHouse");
166        }
167
168        let destination = Destination::from(endpoint);
169        if let Some(pool) = self.pools.lock().get(&destination) {
170            debug!("Pool exists for endpoint: {endpoint}");
171            return Ok(pool.clone());
172        }
173
174        // Parse options (e.g., host, port, database)
175        // NOTE: Settings are ignored since this path is for creating tables and settings will be
176        // delegated to table creation
177        let clickhouse_options = crate::utils::params_to_pool_builder(endpoint, params, true)?;
178
179        // Create connection pool
180        self.attach_pool_builder(destination, clickhouse_options).await
181    }
182}
183
184impl Default for ClickHouseTableProviderFactory {
185    fn default() -> Self { Self::new() }
186}
187
188#[async_trait]
189impl TableProviderFactory for ClickHouseTableProviderFactory {
190    async fn create(
191        &self,
192        _state: &dyn Session,
193        cmd: &CreateExternalTable,
194    ) -> Result<Arc<dyn TableProvider>> {
195        if !cmd.constraints.is_empty() {
196            warn!("Constraints not fully supported in ClickHouse; ignoring: {:?}", cmd.constraints);
197        }
198
199        let name = cmd.name.clone();
200        let mut params = cmd.options.clone();
201        let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into());
202
203        // Pull out endpoint
204        let endpoint =
205            params.get("endpoint").map(ToString::to_string).unwrap_or(cmd.location.clone());
206
207        // Pull out database
208        let database = name
209            .schema()
210            .or(params.get(crate::utils::DEFAULT_DATABASE_PARAM).map(String::as_str))
211            .unwrap_or("default")
212            .to_string();
213
214        // Get or create a clickhouse connection pool
215        let pool = Arc::new(
216            self.get_or_create_pool_from_params(&endpoint, &mut params)
217                .await
218                .inspect_err(|error| tracing::error!(?error, "Failed to create connection pool"))?,
219        );
220
221        // Ensure table reference is properly formatted
222        let name = match name {
223            t @ TableReference::Partial { .. } => t,
224            TableReference::Bare { table } => TableReference::partial(database.as_str(), table),
225            TableReference::Full { schema, table, .. } => TableReference::partial(schema, table),
226        };
227
228        debug!(?name, "Table provider factory creating schema");
229        // Get table options
230        let column_defaults = &cmd.column_defaults;
231        let create_options =
232            crate::utils::params::params_to_create_options(&mut params, column_defaults)
233                .inspect_err(|error| {
234                    tracing::error!(
235                        ?error,
236                        ?params,
237                        "Could not generate table options from params"
238                    );
239                })?;
240
241        // Create table and optionally database
242        crate::utils::create_schema(&name, &schema, &create_options, &pool, cmd.if_not_exists)
243            .await?;
244
245        // Create table provider
246        Ok(ClickHouseTableFactory::new(pool)
247            .with_coercion(self.coerce_schemas)
248            .table_provider_from_schema(name, schema))
249    }
250}