clickhouse_datafusion/builders.rs
1//! The various builders to make registering, updating, and synchronizing remote `ClickHouse`
2//! databases and `DataFusion` easier.
3//!
4//! # `ClickHouseBuilder`
5//!
6//! You begin by creating a `ClickHouseBuilder` instance, which allows you to configure various
7//! aspects of the connection, such as the endpoint, authentication, the underlying `ClickHouse`
8//! connection pool, `clickhouse-arrow` `Client` options, schema mapping, and more.
9//!
10//! NOTE: The schema (database) set as the default on the underlying `ClickHouse` client will be set
11//! as the initial schema after calling `ClickHouseBuilder::build_catalog`. This can be changed on
12//! the fly by calling `with_schema` on the catalog builder returned.
13//!
14//! ## Schema Coercion
15//!
16//! You can also specify whether "schema coercion" should occur during streaming of the data
17//! returned by query execution. If using the full [`super::context::ClickHouseSessionContext`] and
18//! [`super::context::ClickHouseQueryPlanner`], required if you plan on using `ClickHouse`
19//! functions, then the return type provided to the second argument of the `clickhouse(...)` scalar
20//! UDFs will specify the expected return type.
21//!
22//! This means you must be exact, otherwise an arrow error will be returned. By configuring schema
23//! coercion, anything that can be coerced via arrow will be coerced, based on the expected schema
24//! of the `clickhouse` function mentioned above.
25//!
26//! Generally schema conversion should be avoided, as there is a non-zero cost. The cost is incurred
27//! per `RecordBatch` as the results are streamed through `DataFusion`'s execution context, which
28//! means it will have a per `RecordBatch` cost that can be avoided by simply providing the correct
29//! return type to the clickhouse function. But, for non latency-sensitive use cases, this may
30//! provide convenience, especially when dealing with `ClickHouse` higher order functions.
31//!
32//! # `ClickHouseCatalogBuilder`
33//!
34//! After configuration of the `ClickHouseBuilder`, a `ClickHouseCatalogBuilder` will be obtained by
35//! calling `ClickHouseBuilder::build_catalog`. The `ClickHouseCatalogBuilder` returned can then be
36//! used to set the schema being targets, create tables on the remote schema, register existing
37//! tables under aliases, and refresh `DataFusion`'s internal catalog to keep the two in sync.
38//!
39//! Refer to e2e tests in the repository for detailed examples on its usage, especially the tests's
40//! associated "helper" functions. The `basic` example shows a basic usage of the
41//! `ClickHouseBuilder` that suffices for most use cases.
42use std::collections::HashMap;
43use std::sync::Arc;
44
45use clickhouse_arrow::prelude::ClickHouseEngine;
46use clickhouse_arrow::{
47 ArrowConnectionManager, ArrowConnectionPoolBuilder, ArrowOptions, ArrowPoolBuilder,
48 ClientBuilder, CreateOptions, Destination,
49};
50use datafusion::arrow::datatypes::SchemaRef;
51use datafusion::catalog::{CatalogProvider, TableProviderFactory};
52use datafusion::common::{Constraints, DFSchema};
53use datafusion::error::{DataFusionError, Result};
54use datafusion::logical_expr::CreateExternalTable;
55use datafusion::prelude::*;
56use datafusion::sql::TableReference;
57use tracing::{debug, error};
58
59use crate::connection::ClickHouseConnectionPool;
60use crate::providers::catalog::ClickHouseCatalogProvider;
61use crate::providers::table_factory::ClickHouseTableFactory;
62use crate::table_factory::ClickHouseTableProviderFactory;
63use crate::utils::{self, ENDPOINT_PARAM, default_str_to_expr, register_builtins};
64
65/// The default `DataFusion` catalog name if no other name is provided.
66pub const DEFAULT_CLICKHOUSE_CATALOG: &str = "clickhouse";
67
68/// Simple function to provide default [`ArrowOptions`] fit for common use
69pub fn default_arrow_options() -> ArrowOptions {
70 ArrowOptions::default()
71 .with_strings_as_strings(true)
72 .with_strict_schema(false)
73 .with_disable_strict_schema_ddl(true)
74 .with_nullable_array_default_empty(true)
75}
76
77/// Entrypoint builder for `ClickHouse` and `DataFusion` integration.
78///
79/// Ensures `DataFusion` default UDFs as well as `ClickHouse` udfs are registered in
80/// [`SessionContext`], the pool is attached to the [`ClickHouseTableProviderFactory`], and
81/// `ClickHouse` is reachable by the provided endpoint.
82///
83/// NOTE: While `clickhouse_arrow` defaults to binary encoding for strings (via
84/// [`clickhouse_arrow::ArrowOptions::strings_as_strings`] == false), for `DataFusion` the
85/// default is `true`. This can be disabled by modifying the setting via
86/// [`ClickHouseBuilder::configure_arrow_options`]
87pub struct ClickHouseBuilder {
88 endpoint: Destination,
89 pool_builder: ArrowConnectionPoolBuilder,
90 factory: ClickHouseTableProviderFactory,
91}
92
93impl ClickHouseBuilder {
94 /// Create new `ClickHouseBuilder` that can be used to configure both the
95 /// [`clickhouse_arrow::ClientBuilder`], the [`clickhouse_arrow::ArrowPoolBuilder`], and be
96 /// "built" into a [`ClickHouseCatalogBuilder`] for interacting with `ClickHouse` databases and
97 /// tables.
98 pub fn new(endpoint: impl Into<Destination>) -> Self {
99 let endpoint = endpoint.into();
100 let pool_builder = ArrowConnectionPoolBuilder::new(endpoint.clone())
101 .configure_client(|c| c.with_arrow_options(default_arrow_options()));
102 Self { endpoint, pool_builder, factory: ClickHouseTableProviderFactory::new() }
103 }
104
105 /// Create new `ClickHouseBuilder` that can be used to configure both the
106 /// [`clickhouse_arrow::ClientBuilder`], the [`clickhouse_arrow::ArrowPoolBuilder`], and be
107 /// "built" into a [`ClickHouseCatalogBuilder`] for interacting with `ClickHouse` databases and
108 /// tables.
109 pub fn new_with_pool_builder(
110 endpoint: impl Into<Destination>,
111 pool_builder: ArrowConnectionPoolBuilder,
112 ) -> Self {
113 let endpoint = endpoint.into();
114 Self { endpoint, pool_builder, factory: ClickHouseTableProviderFactory::new() }
115 }
116
117 /// Configure whether to coerce schemas to match expected schemas during query execution.
118 /// Remember, there is a non-zero cost incurred per `RecordBatch`, and this is mainly useful for
119 /// allowing looser return types when using clickhouse functions.
120 #[must_use]
121 pub fn with_coercion(mut self, coerce: bool) -> Self {
122 self.factory = self.factory.with_coercion(coerce);
123 self
124 }
125
126 /// Configure the underlying [`clickhouse_arrow::ClientBuilder`].
127 #[must_use]
128 pub fn configure_client(mut self, f: impl FnOnce(ClientBuilder) -> ClientBuilder) -> Self {
129 self.pool_builder = self.pool_builder.configure_client(f);
130 self
131 }
132
133 /// Configure the underlying [`clickhouse_arrow::ArrowPoolBuilder`].
134 #[must_use]
135 pub fn configure_pool(mut self, f: impl FnOnce(ArrowPoolBuilder) -> ArrowPoolBuilder) -> Self {
136 self.pool_builder = self.pool_builder.configure_pool(f);
137 self
138 }
139
140 /// `clickhouse_arrow` defaults to binary encoding for
141 /// [`datafusion::arrow::datatypes::DataType::Utf8`] columns, the default is inverted for
142 /// `DataFusion`. This method allows disabling that change to use binary encoding for strings,
143 /// among other configuration options.
144 ///
145 /// See: [`clickhouse_arrow::ArrowOptions`]
146 #[must_use]
147 pub fn configure_arrow_options(mut self, f: impl FnOnce(ArrowOptions) -> ArrowOptions) -> Self {
148 self.pool_builder = self.pool_builder.configure_client(|c| {
149 let options = c.options().ext.arrow.unwrap_or(default_arrow_options());
150 c.with_arrow_options(f(options))
151 });
152 self
153 }
154
155 /// Build ensures `ClickHouse` builtins are registered (such as nested functions), the pool is
156 /// attached to the factory, the `ClickHouse` endpoint is reachable, and the catalog is created
157 /// and registered to the [`SessionContext`].
158 ///
159 /// NOTE: The `pool_identifier` is used in the case when `federation` is enabled. It will
160 /// indicate if the underlying connection references the same connection or not.
161 ///
162 /// # Errors
163 /// - Returns an error if the `ClickHouse` endpoint is unreachable
164 /// - Returns an error if the `ClickHouse` catalog fails to be created
165 pub async fn build_catalog_from_pool(
166 ctx: &SessionContext,
167 endpoint: impl Into<String>,
168 catalog: Option<&str>,
169 pool_identifier: impl Into<String>,
170 pool: clickhouse_arrow::bb8::Pool<ArrowConnectionManager>,
171 ) -> Result<ClickHouseCatalogBuilder> {
172 // Register built in functions and clickhouse udfs
173 register_builtins(ctx);
174
175 let catalog = catalog.unwrap_or(DEFAULT_CLICKHOUSE_CATALOG).to_string();
176 debug!(catalog, "Attaching pool to ClickHouse table factory");
177
178 let endpoint = endpoint.into();
179 let factory = ClickHouseTableProviderFactory::new();
180 let pool = Arc::new(factory.attach_pool(&endpoint, pool_identifier, pool));
181
182 ClickHouseCatalogBuilder::try_new(ctx, catalog, "", endpoint, pool, factory).await
183 }
184
185 /// Build ensures `ClickHouse` builtins are registered (such as nested functions), the pool is
186 /// attached to the factory, the `ClickHouse` endpoint is reachable, and the catalog is created
187 /// and registered to the [`SessionContext`].
188 ///
189 /// # Errors
190 /// - Returns an error if the `ClickHouse` endpoint is unreachable
191 /// - Returns an error if the `ClickHouse` catalog fails to be created
192 pub async fn build_catalog(
193 self,
194 ctx: &SessionContext,
195 catalog: Option<&str>,
196 ) -> Result<ClickHouseCatalogBuilder> {
197 // Register built in functions and clickhouse udfs
198 register_builtins(ctx);
199
200 let catalog = catalog.unwrap_or(DEFAULT_CLICKHOUSE_CATALOG).to_string();
201 let database = self.pool_builder.client_options().default_database.clone();
202 debug!(catalog, database, "Attaching pool to ClickHouse table factory");
203
204 let endpoint = self.endpoint.to_string();
205 let factory = self.factory;
206 let pool = Arc::new(factory.attach_pool_builder(self.endpoint, self.pool_builder).await?);
207
208 ClickHouseCatalogBuilder::try_new(ctx, catalog, database, endpoint, pool, factory).await
209 }
210}
211
212/// [`ClickHouseCatalogBuilder`] can be used to create tables, register existing tables, and finally
213/// refresh the `ClickHouse` catalog in `DataFusion`.
214///
215/// IMPORTANT! After creating tables, one of the build variations, ie `Self::build` or
216/// `Self::build_schema`, must be called to ensure the catalog provider is up to date with the
217/// remote `ClickHouse` database. If you forget to do this, `DataFusion` queries targeting one of
218/// these tables will fail.
219#[derive(Clone)]
220pub struct ClickHouseCatalogBuilder {
221 catalog: String,
222 /// The current schema the builder is targeting.
223 schema: String,
224 /// The configured remote endpoint the underlying `ClickHouse` pool is connected.
225 endpoint: String,
226 /// The `ClickHouse` connection used to communicate with the remote `ClickHouse` database.
227 pool: Arc<ClickHouseConnectionPool>,
228 /// This factory is used to create new tables in the remote `ClickHouse` database. This builder
229 /// must be built with one of the builder variations, ie `Self::build` or `Self::build_schema`,
230 /// so that the provider reflects the most current remote schema.
231 factory: ClickHouseTableProviderFactory,
232 /// The catalog provider is a passive catalog provider, meaning it must be "refreshed" after
233 /// table creation or schema change of any type to the remove `ClickHouse` database.
234 provider: Arc<ClickHouseCatalogProvider>,
235}
236
237impl ClickHouseCatalogBuilder {
238 // TODO: Docs
239 /// Internal constructor
240 async fn try_new(
241 ctx: &SessionContext,
242 catalog: impl Into<String>,
243 default_schema: impl Into<String>,
244 endpoint: impl Into<String>,
245 pool: Arc<ClickHouseConnectionPool>,
246 factory: ClickHouseTableProviderFactory,
247 ) -> Result<Self> {
248 let schema = default_schema.into();
249 let catalog = catalog.into();
250 let endpoint = endpoint.into();
251
252 // Must have a schema
253 let schema = if schema.is_empty() { "default".to_string() } else { schema };
254
255 // Ensure the database exists if not default
256 if schema != "default" {
257 debug!(schema, "Database not default, attempting create");
258 utils::create_database(&schema, &pool).await?;
259 }
260
261 // Register catalog or create a new one
262 let provider = if factory.coerce_schemas() {
263 ClickHouseCatalogProvider::try_new_with_coercion(Arc::clone(&pool)).await
264 } else {
265 ClickHouseCatalogProvider::try_new(Arc::clone(&pool)).await
266 }
267 .inspect_err(|error| error!(?error, "Failed to register catalog {catalog}"))?;
268
269 let provider = Arc::new(provider);
270
271 drop(
272 ctx.register_catalog(
273 catalog.as_str(),
274 Arc::clone(&provider) as Arc<dyn CatalogProvider>,
275 ),
276 );
277
278 Ok(ClickHouseCatalogBuilder { catalog, schema, endpoint, pool, factory, provider })
279 }
280
281 /// Return the name of the catalog in `DataFusion`'s context that this builder is configuring.
282 pub fn name(&self) -> &str { &self.catalog }
283
284 /// Return the currently set schema (database) being targeted. Can be changed on the fly by
285 /// calling `Self::with_schema`.
286 pub fn schema(&self) -> &str { &self.schema }
287
288 /// Update the current "schema" (database) that this builder is targeting, and continue
289 /// building.
290 ///
291 /// # Errors
292 /// - Returns an error if the schema needs to be created and fails
293 pub async fn with_schema(mut self, name: impl Into<String>) -> Result<Self> {
294 let name = name.into();
295 // Don't re-create if schema is already set
296 if name == self.schema {
297 return Ok(self);
298 }
299 self.schema = name;
300
301 // Ensure the database exists if not default
302 if self.schema != "default" {
303 debug!(schema = self.schema, "Database not default, attempting create");
304 utils::create_database(&self.schema, &self.pool).await?;
305 }
306
307 // Be sure to refresh the catalog before setting the schema
308 self.provider.refresh_catalog(&self.pool).await?;
309
310 Ok(self)
311 }
312
313 /// Create a new table in the remote `ClickHouse` instance.
314 ///
315 /// # Arguments
316 /// - `name`: The name of the table to create.
317 /// - `engine`: The engine to use for the table.
318 /// - `schema`: The schema of the table.
319 ///
320 /// # Returns
321 /// - A [`ClickHouseTableCreator`] that can be used to create tables in the remote `ClickHouse`
322 /// instance.
323 pub fn with_new_table(
324 &self,
325 name: impl Into<String>,
326 engine: impl Into<ClickHouseEngine>,
327 schema: SchemaRef,
328 ) -> ClickHouseTableCreator {
329 let table = name.into();
330 let options = CreateOptions::new(engine.into().to_string());
331 debug!(schema = self.schema, table, ?options, "Initializing table creator");
332 ClickHouseTableCreator { name: table, builder: self.clone(), options, schema }
333 }
334
335 /// Create a new table in the remote `ClickHouse` instance.
336 ///
337 /// # Arguments
338 /// - `name`: The name of the table to create.
339 /// - `schema`: The schema of the table.
340 /// - `options`: More detailed `CreateOptions` for creating the provided table.
341 ///
342 /// # Returns
343 /// - A [`ClickHouseTableCreator`] that can be used to create tables in the remote `ClickHouse`
344 /// instance.
345 pub fn with_new_table_and_options(
346 &self,
347 name: impl Into<String>,
348 schema: SchemaRef,
349 options: CreateOptions,
350 ) -> ClickHouseTableCreator {
351 let table = name.into();
352 debug!(schema = self.schema, table, ?options, "Initializing table creator");
353 ClickHouseTableCreator { name: table, builder: self.clone(), options, schema }
354 }
355
356 /// Register an existing `ClickHouse` table, optionally renaming it in the provided session
357 /// state.
358 ///
359 /// # Errors
360 /// - Returns an error if the table does not exist in the remote database
361 /// - Returns an error if the table cannot be registered to the context
362 pub async fn register_existing_table(
363 &self,
364 name: impl Into<TableReference>,
365 name_as: Option<impl Into<TableReference>>,
366 ctx: &SessionContext,
367 ) -> Result<()> {
368 let name = name.into();
369 let database = name.schema().unwrap_or(&self.schema);
370 let exists =
371 self.pool.connect().await?.tables(database).await?.contains(&name.table().to_string());
372
373 if !exists {
374 return Err(DataFusionError::Plan(format!(
375 "Table '{name}' does not exist in ClickHouse database '{database}', use \
376 `table_creator` instead"
377 )));
378 }
379
380 let table = TableReference::full(self.catalog.as_str(), database, name.table());
381 let table_as = name_as.map(Into::into).unwrap_or(table.clone());
382
383 let factory = ClickHouseTableFactory::new(Arc::clone(&self.pool));
384 let provider = factory.table_provider(table).await?;
385 debug!(?table_as, "Registering ClickHouse table provider");
386 drop(ctx.register_table(table_as, provider)?);
387
388 Ok(())
389 }
390
391 /// Build the current `schema` (database) being managed by this catalog, optionally registering
392 /// a new schema to continue building
393 ///
394 /// Note: For the `SessionContext` to recognize the added tables and updated schema, either this
395 /// function or `Self::build` must be called.
396 ///
397 /// # Errors
398 /// - Returnes an error if an error occurs while refreshing the catalog
399 pub async fn build_schema(
400 mut self,
401 new_schema: Option<String>,
402 ctx: &SessionContext,
403 ) -> Result<Self> {
404 let _catalog = self.build_internal(ctx).await?;
405 self.schema = new_schema.unwrap_or(self.schema);
406 Ok(self)
407 }
408
409 /// Re-register the catalog, updating the [`SessionContext`], and return the updated context.
410 ///
411 /// Note: Important! For the [`SessionContext`] to recognize the added tables and updated
412 /// schema, either this function or `Self::build` must be called. For that reason, it is
413 /// important to use the [`SessionContext`] provided back from this function.
414 ///
415 /// # Errors
416 /// - Returns an error if the `SessionContext` has not been federated
417 /// - Returnes an error if an error occurs while refreshing the catalog
418 /// - Returns an error if the "federation" feature is enabled but the context is not federated
419 pub async fn build(&self, ctx: &SessionContext) -> Result<Arc<ClickHouseCatalogProvider>> {
420 #[cfg(feature = "federation")]
421 {
422 use datafusion::common::exec_err;
423
424 use crate::federation::FederatedContext as _;
425
426 if !ctx.is_federated() {
427 return exec_err!(
428 "Building this schema with federation enabled but no federated SessionContext \
429 will fail. Call `ctx.federate()` before providing a context to build with."
430 );
431 }
432 }
433
434 self.build_internal(ctx).await
435 }
436
437 /// Re-registers the catalog provider, re-configures the [`SessionContext`], and return a
438 /// clone of the catalog.
439 ///
440 /// # Errors
441 /// - Returnes an error if an error occurs while refreshing the catalog
442 async fn build_internal(&self, ctx: &SessionContext) -> Result<Arc<ClickHouseCatalogProvider>> {
443 let catalog = Arc::clone(&self.provider);
444 debug!(catalog = self.catalog, "ClickHouse catalog created");
445
446 // Re-register UDFs
447 register_builtins(ctx);
448
449 // Re-register catalog
450 catalog.refresh_catalog(&self.pool).await?;
451 drop(ctx.register_catalog(&self.catalog, Arc::clone(&catalog) as Arc<dyn CatalogProvider>));
452
453 Ok(catalog)
454 }
455}
456
457/// Builder phase for creating `ClickHouse` tables.
458#[derive(Clone)]
459pub struct ClickHouseTableCreator {
460 name: String,
461 builder: ClickHouseCatalogBuilder,
462 schema: SchemaRef,
463 options: CreateOptions,
464}
465
466impl ClickHouseTableCreator {
467 /// Update the underlying table create options that will be passed to clickhouse.
468 ///
469 /// See [`CreateOptions`] for more information.
470 #[must_use]
471 pub fn update_create_options(
472 mut self,
473 update: impl Fn(CreateOptions) -> CreateOptions,
474 ) -> Self {
475 self.options = update(self.options);
476 self
477 }
478
479 /// Create the table, returning back a [`ClickHouseCatalogBuilder`] to create more tables.
480 ///
481 /// # Errors
482 /// - Returns an error if the `TableProviderFactory` fails to create the table
483 /// - Returnes an error if an error occurs while refreshing the catalog
484 pub async fn create(self, ctx: &SessionContext) -> Result<ClickHouseCatalogBuilder> {
485 let schema = self.builder.schema.clone();
486 let table = self.name;
487
488 let column_defaults = self
489 .options
490 .clone()
491 .defaults
492 .unwrap_or_default()
493 .into_iter()
494 .map(|(col, value)| (col, default_str_to_expr(&value)))
495 .collect::<HashMap<_, _>>();
496
497 let mut options = utils::create_options_to_params(self.options).into_params();
498 drop(options.insert(ENDPOINT_PARAM.into(), self.builder.endpoint.clone()));
499
500 let table_ref = TableReference::partial(schema.as_str(), table.as_str());
501 let cmd = CreateExternalTable {
502 name: table_ref.clone(),
503 schema: Arc::new(DFSchema::try_from(Arc::clone(&self.schema))?),
504 options,
505 column_defaults,
506 constraints: Constraints::default(),
507 table_partition_cols: vec![],
508 if_not_exists: false,
509 location: String::new(),
510 file_type: String::new(),
511 temporary: false,
512 definition: None,
513 order_exprs: vec![],
514 unbounded: false,
515 };
516 let _provider = self
517 .builder
518 .factory
519 .create(&ctx.state(), &cmd)
520 .await
521 .inspect_err(|error| error!(?error, table, "Factory error creating table"))?;
522 debug!(table, "Table created, catalog will be refreshed in `build`");
523
524 // Refresh the catalog after creating the table
525 drop(self.builder.build_internal(ctx).await?);
526
527 Ok(self.builder)
528 }
529}