clickhouse_datafusion/providers/
table_factory.rs1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use clickhouse_arrow::{ArrowConnectionManager, ArrowConnectionPoolBuilder, Destination};
6use datafusion::arrow::datatypes::SchemaRef;
7use datafusion::catalog::{Session, TableProvider, TableProviderFactory};
8use datafusion::common::exec_err;
9use datafusion::error::Result;
10use datafusion::logical_expr::CreateExternalTable;
11use datafusion::sql::TableReference;
12use parking_lot::Mutex;
13use tracing::{debug, warn};
14
15use crate::connection::ClickHouseConnectionPool;
16use crate::providers::table::ClickHouseTableProvider;
17
18#[derive(Debug, Clone)]
26pub struct ClickHouseTableFactory {
27 pool: Arc<ClickHouseConnectionPool>,
28 coerce_schema: bool,
29}
30
31impl ClickHouseTableFactory {
32 pub fn new(pool: Arc<ClickHouseConnectionPool>) -> Self { Self { pool, coerce_schema: false } }
33
34 pub fn pool(&self) -> &Arc<ClickHouseConnectionPool> { &self.pool }
36
37 #[must_use]
39 pub fn with_coercion(mut self, coerce_schema: bool) -> Self {
40 self.coerce_schema = coerce_schema;
41 self
42 }
43
44 pub async fn table_provider(
47 &self,
48 table_reference: TableReference,
49 ) -> Result<Arc<dyn TableProvider + 'static>> {
50 let pool = Arc::clone(&self.pool);
51 debug!(%table_reference, "Creating ClickHouse table provider");
52
53 let provider = Arc::new(
54 ClickHouseTableProvider::try_new(pool, table_reference)
55 .await
56 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
57 .with_coercion(self.coerce_schema),
58 );
59
60 #[cfg(feature = "federation")]
61 let provider =
62 Arc::new(provider.create_federated_table_provider()) as Arc<dyn TableProvider>;
63
64 Ok(provider)
65 }
66
67 pub fn table_provider_from_schema(
72 &self,
73 table_reference: TableReference,
74 schema: SchemaRef,
75 ) -> Arc<dyn TableProvider + 'static> {
76 debug!(%table_reference, "Creating ClickHouse table provider from schema");
77 let provider = Arc::new(
78 ClickHouseTableProvider::new_with_schema_unchecked(
79 Arc::clone(&self.pool),
80 table_reference,
81 schema,
82 )
83 .with_coercion(self.coerce_schema),
84 );
85 #[cfg(feature = "federation")]
86 let provider = Arc::new(provider.create_federated_table_provider());
87 provider
88 }
89}
90
91#[derive(Debug, Clone)]
93pub struct ClickHouseTableProviderFactory {
94 pools: Arc<Mutex<HashMap<Destination, ClickHouseConnectionPool>>>,
95 coerce_schemas: bool,
96}
97
98impl ClickHouseTableProviderFactory {
99 pub fn new() -> Self {
100 Self { pools: Arc::new(Mutex::new(HashMap::default())), coerce_schemas: false }
101 }
102
103 #[must_use]
106 pub fn with_coercion(mut self, coerce: bool) -> Self {
107 self.coerce_schemas = coerce;
108 self
109 }
110
111 pub fn coerce_schemas(&self) -> bool { self.coerce_schemas }
112}
113
114impl ClickHouseTableProviderFactory {
115 pub async fn new_with_builder(
119 endpoint: impl Into<Destination>,
120 builder: ArrowConnectionPoolBuilder,
121 ) -> Result<Self> {
122 let this = Self::new();
123 drop(this.attach_pool_builder(endpoint, builder).await?);
124 Ok(this)
125 }
126
127 pub async fn attach_pool_builder(
134 &self,
135 endpoint: impl Into<Destination>,
136 builder: ArrowConnectionPoolBuilder,
137 ) -> Result<ClickHouseConnectionPool> {
138 let endpoint = endpoint.into();
139 debug!(?endpoint, "Attaching ClickHouse connection pool");
140
141 let builder = builder.configure_client(|c| c.with_database("default"));
143
144 let pool = ClickHouseConnectionPool::from_pool_builder(builder).await?;
146 debug!(?endpoint, "Connection pool created successfully");
147
148 drop(self.pools.lock().insert(endpoint, pool.clone()));
150 Ok(pool)
151 }
152
153 #[cfg_attr(feature = "mocks", expect(clippy::needless_pass_by_value))]
155 pub fn attach_pool(
156 &self,
157 endpoint: impl Into<Destination>,
158 identifer: impl Into<String>,
159 #[cfg_attr(feature = "mocks", expect(unused))] pool: clickhouse_arrow::bb8::Pool<
160 ArrowConnectionManager,
161 >,
162 ) -> ClickHouseConnectionPool {
163 let endpoint = endpoint.into();
164 debug!(?endpoint, "Attaching ClickHouse connection pool");
165
166 #[cfg(not(feature = "mocks"))]
167 let pool = ClickHouseConnectionPool::new(identifer, pool);
168
169 #[cfg(feature = "mocks")]
170 let pool = ClickHouseConnectionPool::new(identifer, ());
171
172 debug!(?endpoint, "Connection pool created successfully");
173
174 drop(self.pools.lock().insert(endpoint, pool.clone()));
176 pool
177 }
178
179 async fn get_or_create_pool_from_params(
185 &self,
186 endpoint: &str,
187 params: &mut HashMap<String, String>,
188 ) -> Result<ClickHouseConnectionPool> {
189 if endpoint.is_empty() {
190 tracing::error!("Endpoint is required for ClickHouse, received empty value");
191 return exec_err!("Endpoint is required for ClickHouse");
192 }
193
194 let destination = Destination::from(endpoint);
195 if let Some(pool) = self.pools.lock().get(&destination) {
196 debug!("Pool exists for endpoint: {endpoint}");
197 return Ok(pool.clone());
198 }
199
200 let clickhouse_options = crate::utils::params_to_pool_builder(endpoint, params, true)?;
204
205 self.attach_pool_builder(destination, clickhouse_options).await
207 }
208}
209
210impl Default for ClickHouseTableProviderFactory {
211 fn default() -> Self { Self::new() }
212}
213
214#[async_trait]
215impl TableProviderFactory for ClickHouseTableProviderFactory {
216 async fn create(
217 &self,
218 _state: &dyn Session,
219 cmd: &CreateExternalTable,
220 ) -> Result<Arc<dyn TableProvider>> {
221 if !cmd.constraints.is_empty() {
222 warn!("Constraints not fully supported in ClickHouse; ignoring: {:?}", cmd.constraints);
223 }
224
225 let name = cmd.name.clone();
226 let mut params = cmd.options.clone();
227 let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into());
228
229 let endpoint =
231 params.get("endpoint").map(ToString::to_string).unwrap_or(cmd.location.clone());
232
233 let database = name
235 .schema()
236 .or(params.get(crate::utils::DEFAULT_DATABASE_PARAM).map(String::as_str))
237 .unwrap_or("default")
238 .to_string();
239
240 let pool = Arc::new(
242 self.get_or_create_pool_from_params(&endpoint, &mut params)
243 .await
244 .inspect_err(|error| tracing::error!(?error, "Failed to create connection pool"))?,
245 );
246
247 let name = match name {
249 t @ TableReference::Partial { .. } => t,
250 TableReference::Bare { table } => TableReference::partial(database.as_str(), table),
251 TableReference::Full { schema, table, .. } => TableReference::partial(schema, table),
252 };
253
254 debug!(?name, "Table provider factory creating schema");
255 let column_defaults = &cmd.column_defaults;
257 let create_options =
258 crate::utils::params::params_to_create_options(&mut params, column_defaults)
259 .inspect_err(|error| {
260 tracing::error!(
261 ?error,
262 ?params,
263 "Could not generate table options from params"
264 );
265 })?;
266
267 crate::utils::create_schema(&name, &schema, &create_options, &pool, cmd.if_not_exists)
269 .await?;
270
271 Ok(ClickHouseTableFactory::new(pool)
273 .with_coercion(self.coerce_schemas)
274 .table_provider_from_schema(name, schema))
275 }
276}