clickhouse_datafusion/providers/
table_factory.rs1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use clickhouse_arrow::{ArrowConnectionPoolBuilder, Destination};
6use datafusion::arrow::datatypes::SchemaRef;
7use datafusion::catalog::{Session, TableProvider, TableProviderFactory};
8use datafusion::common::exec_err;
9use datafusion::error::Result;
10use datafusion::logical_expr::CreateExternalTable;
11use datafusion::sql::TableReference;
12use parking_lot::Mutex;
13use tracing::{debug, warn};
14
15use crate::connection::ClickHouseConnectionPool;
16use crate::providers::table::ClickHouseTableProvider;
17
18#[derive(Debug, Clone)]
26pub struct ClickHouseTableFactory {
27 pool: Arc<ClickHouseConnectionPool>,
28 coerce_schema: bool,
29}
30
31impl ClickHouseTableFactory {
32 pub fn new(pool: Arc<ClickHouseConnectionPool>) -> Self { Self { pool, coerce_schema: false } }
33
34 pub fn pool(&self) -> &Arc<ClickHouseConnectionPool> { &self.pool }
36
37 #[must_use]
39 pub fn with_coercion(mut self, coerce_schema: bool) -> Self {
40 self.coerce_schema = coerce_schema;
41 self
42 }
43
44 pub async fn table_provider(
47 &self,
48 table_reference: TableReference,
49 ) -> Result<Arc<dyn TableProvider + 'static>> {
50 let pool = Arc::clone(&self.pool);
51 debug!(%table_reference, "Creating ClickHouse table provider");
52
53 let provider = Arc::new(
54 ClickHouseTableProvider::try_new(pool, table_reference)
55 .await
56 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
57 .with_coercion(self.coerce_schema),
58 );
59
60 #[cfg(feature = "federation")]
61 let provider =
62 Arc::new(provider.create_federated_table_provider()) as Arc<dyn TableProvider>;
63
64 Ok(provider)
65 }
66
67 pub fn table_provider_from_schema(
72 &self,
73 table_reference: TableReference,
74 schema: SchemaRef,
75 ) -> Arc<dyn TableProvider + 'static> {
76 debug!(%table_reference, "Creating ClickHouse table provider from schema");
77 let provider = Arc::new(
78 ClickHouseTableProvider::new_with_schema_unchecked(
79 Arc::clone(&self.pool),
80 table_reference,
81 schema,
82 )
83 .with_coercion(self.coerce_schema),
84 );
85 #[cfg(feature = "federation")]
86 let provider = Arc::new(provider.create_federated_table_provider());
87 provider
88 }
89}
90
91#[derive(Debug, Clone)]
93pub struct ClickHouseTableProviderFactory {
94 pools: Arc<Mutex<HashMap<Destination, ClickHouseConnectionPool>>>,
95 coerce_schemas: bool,
96}
97
98impl ClickHouseTableProviderFactory {
99 pub fn new() -> Self {
100 Self { pools: Arc::new(Mutex::new(HashMap::default())), coerce_schemas: false }
101 }
102
103 #[must_use]
106 pub fn with_coercion(mut self, coerce: bool) -> Self {
107 self.coerce_schemas = coerce;
108 self
109 }
110
111 pub fn coerce_schemas(&self) -> bool { self.coerce_schemas }
112}
113
114impl ClickHouseTableProviderFactory {
115 pub async fn new_with_builder(
119 endpoint: impl Into<Destination>,
120 builder: ArrowConnectionPoolBuilder,
121 ) -> Result<Self> {
122 let this = Self::new();
123 drop(this.attach_pool_builder(endpoint, builder).await?);
124 Ok(this)
125 }
126
127 pub async fn attach_pool_builder(
134 &self,
135 endpoint: impl Into<Destination>,
136 builder: ArrowConnectionPoolBuilder,
137 ) -> Result<ClickHouseConnectionPool> {
138 let endpoint = endpoint.into();
139 debug!(?endpoint, "Attaching ClickHouse connection pool");
140
141 let builder = builder.configure_client(|c| c.with_database("default"));
143
144 let pool = ClickHouseConnectionPool::from_pool_builder(builder).await?;
146 debug!(?endpoint, "Connection pool created successfully");
147
148 drop(self.pools.lock().insert(endpoint, pool.clone()));
150 Ok(pool)
151 }
152
153 async fn get_or_create_pool_from_params(
159 &self,
160 endpoint: &str,
161 params: &mut HashMap<String, String>,
162 ) -> Result<ClickHouseConnectionPool> {
163 if endpoint.is_empty() {
164 tracing::error!("Endpoint is required for ClickHouse, received empty value");
165 return exec_err!("Endpoint is required for ClickHouse");
166 }
167
168 let destination = Destination::from(endpoint);
169 if let Some(pool) = self.pools.lock().get(&destination) {
170 debug!("Pool exists for endpoint: {endpoint}");
171 return Ok(pool.clone());
172 }
173
174 let clickhouse_options = crate::utils::params_to_pool_builder(endpoint, params, true)?;
178
179 self.attach_pool_builder(destination, clickhouse_options).await
181 }
182}
183
184impl Default for ClickHouseTableProviderFactory {
185 fn default() -> Self { Self::new() }
186}
187
188#[async_trait]
189impl TableProviderFactory for ClickHouseTableProviderFactory {
190 async fn create(
191 &self,
192 _state: &dyn Session,
193 cmd: &CreateExternalTable,
194 ) -> Result<Arc<dyn TableProvider>> {
195 if !cmd.constraints.is_empty() {
196 warn!("Constraints not fully supported in ClickHouse; ignoring: {:?}", cmd.constraints);
197 }
198
199 let name = cmd.name.clone();
200 let mut params = cmd.options.clone();
201 let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into());
202
203 let endpoint =
205 params.get("endpoint").map(ToString::to_string).unwrap_or(cmd.location.clone());
206
207 let database = name
209 .schema()
210 .or(params.get(crate::utils::DEFAULT_DATABASE_PARAM).map(String::as_str))
211 .unwrap_or("default")
212 .to_string();
213
214 let pool = Arc::new(
216 self.get_or_create_pool_from_params(&endpoint, &mut params)
217 .await
218 .inspect_err(|error| tracing::error!(?error, "Failed to create connection pool"))?,
219 );
220
221 let name = match name {
223 t @ TableReference::Partial { .. } => t,
224 TableReference::Bare { table } => TableReference::partial(database.as_str(), table),
225 TableReference::Full { schema, table, .. } => TableReference::partial(schema, table),
226 };
227
228 debug!(?name, "Table provider factory creating schema");
229 let column_defaults = &cmd.column_defaults;
231 let create_options =
232 crate::utils::params::params_to_create_options(&mut params, column_defaults)
233 .inspect_err(|error| {
234 tracing::error!(
235 ?error,
236 ?params,
237 "Could not generate table options from params"
238 );
239 })?;
240
241 crate::utils::create_schema(&name, &schema, &create_options, &pool, cmd.if_not_exists)
243 .await?;
244
245 Ok(ClickHouseTableFactory::new(pool)
247 .with_coercion(self.coerce_schemas)
248 .table_provider_from_schema(name, schema))
249 }
250}