clickhouse_datafusion/providers/
table_factory.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use clickhouse_arrow::{ArrowConnectionManager, ArrowConnectionPoolBuilder, Destination};
6use datafusion::arrow::datatypes::SchemaRef;
7use datafusion::catalog::{Session, TableProvider, TableProviderFactory};
8use datafusion::common::exec_err;
9use datafusion::error::Result;
10use datafusion::logical_expr::CreateExternalTable;
11use datafusion::sql::TableReference;
12use parking_lot::Mutex;
13use tracing::{debug, warn};
14
15use crate::connection::ClickHouseConnectionPool;
16use crate::providers::table::ClickHouseTableProvider;
17
18// TODO: Docs - especially explain the different ways to use this, the fact that it exists since it
19// wraps a `ClickHouseConnectionPool`, and how to use it with a `ClickHouseConnectionPoolBuilder`.
20//
21/// A table factory for creating `ClickHouse` [`TableProvider`]s.
22///
23/// Returns a federated table provider if the `federation` feature is enabled, otherwise a
24/// [`TableProvider`]
25#[derive(Debug, Clone)]
26pub struct ClickHouseTableFactory {
27    pool:          Arc<ClickHouseConnectionPool>,
28    coerce_schema: bool,
29}
30
31impl ClickHouseTableFactory {
32    pub fn new(pool: Arc<ClickHouseConnectionPool>) -> Self { Self { pool, coerce_schema: false } }
33
34    /// Get a reference to the connection pool.
35    pub fn pool(&self) -> &Arc<ClickHouseConnectionPool> { &self.pool }
36
37    /// Set whether to coerce the schema of the table provider.
38    #[must_use]
39    pub fn with_coercion(mut self, coerce_schema: bool) -> Self {
40        self.coerce_schema = coerce_schema;
41        self
42    }
43
44    /// # Errors
45    /// - Returns an error if the table provider cannot be created.
46    pub async fn table_provider(
47        &self,
48        table_reference: TableReference,
49    ) -> Result<Arc<dyn TableProvider + 'static>> {
50        let pool = Arc::clone(&self.pool);
51        debug!(%table_reference, "Creating ClickHouse table provider");
52
53        let provider = Arc::new(
54            ClickHouseTableProvider::try_new(pool, table_reference)
55                .await
56                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
57                .with_coercion(self.coerce_schema),
58        );
59
60        #[cfg(feature = "federation")]
61        let provider =
62            Arc::new(provider.create_federated_table_provider()) as Arc<dyn TableProvider>;
63
64        Ok(provider)
65    }
66
67    /// Create a table provider from a schema.
68    ///
69    /// # Errors
70    /// - Returns an error only in the federation feature path, if federating fails.
71    pub fn table_provider_from_schema(
72        &self,
73        table_reference: TableReference,
74        schema: SchemaRef,
75    ) -> Arc<dyn TableProvider + 'static> {
76        debug!(%table_reference, "Creating ClickHouse table provider from schema");
77        let provider = Arc::new(
78            ClickHouseTableProvider::new_with_schema_unchecked(
79                Arc::clone(&self.pool),
80                table_reference,
81                schema,
82            )
83            .with_coercion(self.coerce_schema),
84        );
85        #[cfg(feature = "federation")]
86        let provider = Arc::new(provider.create_federated_table_provider());
87        provider
88    }
89}
90
91/// A `DataFusion` [`TableProviderFactory`] for creating `ClickHouse` tables.
92#[derive(Debug, Clone)]
93pub struct ClickHouseTableProviderFactory {
94    pools:          Arc<Mutex<HashMap<Destination, ClickHouseConnectionPool>>>,
95    coerce_schemas: bool,
96}
97
98impl ClickHouseTableProviderFactory {
99    pub fn new() -> Self {
100        Self { pools: Arc::new(Mutex::new(HashMap::default())), coerce_schemas: false }
101    }
102
103    /// Set whether [`ClickHouseTableProvider`]s should attempt to coerce `RecordBatch` schemas to
104    /// the schema of the `LogicalPlan` being executed.
105    #[must_use]
106    pub fn with_coercion(mut self, coerce: bool) -> Self {
107        self.coerce_schemas = coerce;
108        self
109    }
110
111    pub fn coerce_schemas(&self) -> bool { self.coerce_schemas }
112}
113
114impl ClickHouseTableProviderFactory {
115    // TODO: Docs
116    /// # Errors
117    /// - Returns an error if creating the `ClickHouseConnectionPool` fails.
118    pub async fn new_with_builder(
119        endpoint: impl Into<Destination>,
120        builder: ArrowConnectionPoolBuilder,
121    ) -> Result<Self> {
122        let this = Self::new();
123        drop(this.attach_pool_builder(endpoint, builder).await?);
124        Ok(this)
125    }
126
127    // TODO: Docs
128    /// Attach an existing [`ClickHouseConnectionPool`] to the factory by providing
129    /// [`ArrowConnectionPoolBuilder`] which will be built into a [`ClickHouseConnectionPool`]
130    ///
131    /// # Errors
132    /// - Returns an error if creating the `ClickHouseConnectionPool` fails.
133    pub async fn attach_pool_builder(
134        &self,
135        endpoint: impl Into<Destination>,
136        builder: ArrowConnectionPoolBuilder,
137    ) -> Result<ClickHouseConnectionPool> {
138        let endpoint = endpoint.into();
139        debug!(?endpoint, "Attaching ClickHouse connection pool");
140
141        // Since this pool will be used for ddl, it's essential it connects to the "default" db
142        let builder = builder.configure_client(|c| c.with_database("default"));
143
144        // Create connection pool
145        let pool = ClickHouseConnectionPool::from_pool_builder(builder).await?;
146        debug!(?endpoint, "Connection pool created successfully");
147
148        // Update map
149        drop(self.pools.lock().insert(endpoint, pool.clone()));
150        Ok(pool)
151    }
152
153    /// Attach an existing [`ClickHouseConnectionPool`] to the factory directly.
154    #[cfg_attr(feature = "mocks", expect(clippy::needless_pass_by_value))]
155    pub fn attach_pool(
156        &self,
157        endpoint: impl Into<Destination>,
158        identifer: impl Into<String>,
159        #[cfg_attr(feature = "mocks", expect(unused))] pool: clickhouse_arrow::bb8::Pool<
160            ArrowConnectionManager,
161        >,
162    ) -> ClickHouseConnectionPool {
163        let endpoint = endpoint.into();
164        debug!(?endpoint, "Attaching ClickHouse connection pool");
165
166        #[cfg(not(feature = "mocks"))]
167        let pool = ClickHouseConnectionPool::new(identifer, pool);
168
169        #[cfg(feature = "mocks")]
170        let pool = ClickHouseConnectionPool::new(identifer, ());
171
172        debug!(?endpoint, "Connection pool created successfully");
173
174        // Update map
175        drop(self.pools.lock().insert(endpoint, pool.clone()));
176        pool
177    }
178
179    // TODO: Docs - explain how serialized params are used by `TableProviderFactory` below and how
180    // this method enables it
181    //
182    /// Get or create a connection pool from parameters, returning an existing connection pool if
183    /// the endpoint is already connected
184    async fn get_or_create_pool_from_params(
185        &self,
186        endpoint: &str,
187        params: &mut HashMap<String, String>,
188    ) -> Result<ClickHouseConnectionPool> {
189        if endpoint.is_empty() {
190            tracing::error!("Endpoint is required for ClickHouse, received empty value");
191            return exec_err!("Endpoint is required for ClickHouse");
192        }
193
194        let destination = Destination::from(endpoint);
195        if let Some(pool) = self.pools.lock().get(&destination) {
196            debug!("Pool exists for endpoint: {endpoint}");
197            return Ok(pool.clone());
198        }
199
200        // Parse options (e.g., host, port, database)
201        // NOTE: Settings are ignored since this path is for creating tables and settings will be
202        // delegated to table creation
203        let clickhouse_options = crate::utils::params_to_pool_builder(endpoint, params, true)?;
204
205        // Create connection pool
206        self.attach_pool_builder(destination, clickhouse_options).await
207    }
208}
209
210impl Default for ClickHouseTableProviderFactory {
211    fn default() -> Self { Self::new() }
212}
213
214#[async_trait]
215impl TableProviderFactory for ClickHouseTableProviderFactory {
216    async fn create(
217        &self,
218        _state: &dyn Session,
219        cmd: &CreateExternalTable,
220    ) -> Result<Arc<dyn TableProvider>> {
221        if !cmd.constraints.is_empty() {
222            warn!("Constraints not fully supported in ClickHouse; ignoring: {:?}", cmd.constraints);
223        }
224
225        let name = cmd.name.clone();
226        let mut params = cmd.options.clone();
227        let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into());
228
229        // Pull out endpoint
230        let endpoint =
231            params.get("endpoint").map(ToString::to_string).unwrap_or(cmd.location.clone());
232
233        // Pull out database
234        let database = name
235            .schema()
236            .or(params.get(crate::utils::DEFAULT_DATABASE_PARAM).map(String::as_str))
237            .unwrap_or("default")
238            .to_string();
239
240        // Get or create a clickhouse connection pool
241        let pool = Arc::new(
242            self.get_or_create_pool_from_params(&endpoint, &mut params)
243                .await
244                .inspect_err(|error| tracing::error!(?error, "Failed to create connection pool"))?,
245        );
246
247        // Ensure table reference is properly formatted
248        let name = match name {
249            t @ TableReference::Partial { .. } => t,
250            TableReference::Bare { table } => TableReference::partial(database.as_str(), table),
251            TableReference::Full { schema, table, .. } => TableReference::partial(schema, table),
252        };
253
254        debug!(?name, "Table provider factory creating schema");
255        // Get table options
256        let column_defaults = &cmd.column_defaults;
257        let create_options =
258            crate::utils::params::params_to_create_options(&mut params, column_defaults)
259                .inspect_err(|error| {
260                    tracing::error!(
261                        ?error,
262                        ?params,
263                        "Could not generate table options from params"
264                    );
265                })?;
266
267        // Create table and optionally database
268        crate::utils::create_schema(&name, &schema, &create_options, &pool, cmd.if_not_exists)
269            .await?;
270
271        // Create table provider
272        Ok(ClickHouseTableFactory::new(pool)
273            .with_coercion(self.coerce_schemas)
274            .table_provider_from_schema(name, schema))
275    }
276}