clickhouse_datafusion/
builders.rs

1//! The various builders to make registering, updating, and synchronizing remote `ClickHouse`
2//! databases and `DataFusion` easier.
3//!
4//! # `ClickHouseBuilder`
5//!
6//! You begin by creating a `ClickHouseBuilder` instance, which allows you to configure various
7//! aspects of the connection, such as the endpoint, authentication, the underlying `ClickHouse`
8//! connection pool, `clickhouse-arrow` `Client` options, schema mapping, and more.
9//!
10//! NOTE: The schema (database) set as the default on the underlying `ClickHouse` client will be set
11//! as the initial schema after calling `ClickHouseBuilder::build_catalog`. This can be changed on
12//! the fly by calling `with_schema` on the catalog builder returned.
13//!
14//! ## Schema Coercion
15//!
16//! You can also specify whether "schema coercion" should occur during streaming of the data
17//! returned by query execution. If using the full [`super::context::ClickHouseSessionContext`] and
18//! [`super::context::ClickHouseQueryPlanner`], required if you plan on using `ClickHouse`
19//! functions, then the return type provided to the second argument of the `clickhouse(...)` scalar
20//! UDFs will specify the expected return type.
21//!
22//! This means you must be exact, otherwise an arrow error will be returned. By configuring schema
23//! coercion, anything that can be coerced via arrow will be coerced, based on the expected schema
24//! of the `clickhouse` function mentioned above.
25//!
26//! Generally schema conversion should be avoided, as there is a non-zero cost. The cost is incurred
27//! per `RecordBatch` as the results are streamed through `DataFusion`'s execution context, which
28//! means it will have a per `RecordBatch` cost that can be avoided by simply providing the correct
29//! return type to the clickhouse function. But, for non latency-sensitive use cases, this may
30//! provide convenience, especially when dealing with `ClickHouse` higher order functions.
31//!
32//! # `ClickHouseCatalogBuilder`
33//!
34//! After configuration of the `ClickHouseBuilder`, a `ClickHouseCatalogBuilder` will be obtained by
35//! calling `ClickHouseBuilder::build_catalog`. The `ClickHouseCatalogBuilder` returned can then be
36//! used to set the schema being targets, create tables on the remote schema, register existing
37//! tables under aliases, and refresh `DataFusion`'s internal catalog to keep the two in sync.
38//!
39//! Refer to e2e tests in the repository for detailed examples on its usage, especially the tests's
40//! associated "helper" functions. The `basic` example shows a basic usage of the
41//! `ClickHouseBuilder` that suffices for most use cases.
42use std::collections::HashMap;
43use std::sync::Arc;
44
45use clickhouse_arrow::prelude::ClickHouseEngine;
46use clickhouse_arrow::{
47    ArrowConnectionManager, ArrowConnectionPoolBuilder, ArrowOptions, ArrowPoolBuilder,
48    ClientBuilder, CreateOptions, Destination,
49};
50use datafusion::arrow::datatypes::SchemaRef;
51use datafusion::catalog::{CatalogProvider, TableProviderFactory};
52use datafusion::common::{Constraints, DFSchema};
53use datafusion::error::{DataFusionError, Result};
54use datafusion::logical_expr::CreateExternalTable;
55use datafusion::prelude::*;
56use datafusion::sql::TableReference;
57use tracing::{debug, error};
58
59use crate::connection::ClickHouseConnectionPool;
60use crate::providers::catalog::ClickHouseCatalogProvider;
61use crate::providers::table_factory::ClickHouseTableFactory;
62use crate::table_factory::ClickHouseTableProviderFactory;
63use crate::utils::{self, ENDPOINT_PARAM, default_str_to_expr, register_builtins};
64
65/// The default `DataFusion` catalog name if no other name is provided.
66pub const DEFAULT_CLICKHOUSE_CATALOG: &str = "clickhouse";
67
68/// Simple function to provide default [`ArrowOptions`] fit for common use
69pub fn default_arrow_options() -> ArrowOptions {
70    ArrowOptions::default()
71        .with_strings_as_strings(true)
72        .with_strict_schema(false)
73        .with_disable_strict_schema_ddl(true)
74        .with_nullable_array_default_empty(true)
75}
76
77/// Entrypoint builder for `ClickHouse` and `DataFusion` integration.
78///
79/// Ensures `DataFusion` default UDFs as well as `ClickHouse` udfs are registered in
80/// [`SessionContext`], the pool is attached to the [`ClickHouseTableProviderFactory`], and
81/// `ClickHouse` is reachable by the provided endpoint.
82///
83/// NOTE: While `clickhouse_arrow` defaults to binary encoding for strings (via
84/// [`clickhouse_arrow::ArrowOptions::strings_as_strings`] == false), for `DataFusion` the
85/// default is `true`. This can be disabled by modifying the setting via
86/// [`ClickHouseBuilder::configure_arrow_options`]
87pub struct ClickHouseBuilder {
88    endpoint:          Destination,
89    pool_builder:      ArrowConnectionPoolBuilder,
90    factory:           ClickHouseTableProviderFactory,
91    write_concurrency: Option<usize>,
92}
93
94impl ClickHouseBuilder {
95    /// Create new `ClickHouseBuilder` that can be used to configure both the
96    /// [`clickhouse_arrow::ClientBuilder`], the [`clickhouse_arrow::ArrowPoolBuilder`], and be
97    /// "built" into a [`ClickHouseCatalogBuilder`] for interacting with `ClickHouse` databases and
98    /// tables.
99    pub fn new(endpoint: impl Into<Destination>) -> Self {
100        let endpoint = endpoint.into();
101        let pool_builder = ArrowConnectionPoolBuilder::new(endpoint.clone())
102            .configure_client(|c| c.with_arrow_options(default_arrow_options()));
103        Self {
104            endpoint,
105            pool_builder,
106            factory: ClickHouseTableProviderFactory::new(),
107            write_concurrency: None,
108        }
109    }
110
111    /// Create new `ClickHouseBuilder` that can be used to configure both the
112    /// [`clickhouse_arrow::ClientBuilder`], the [`clickhouse_arrow::ArrowPoolBuilder`], and be
113    /// "built" into a [`ClickHouseCatalogBuilder`] for interacting with `ClickHouse` databases and
114    /// tables.
115    pub fn new_with_pool_builder(
116        endpoint: impl Into<Destination>,
117        pool_builder: ArrowConnectionPoolBuilder,
118    ) -> Self {
119        let endpoint = endpoint.into();
120        Self {
121            endpoint,
122            pool_builder,
123            factory: ClickHouseTableProviderFactory::new(),
124            write_concurrency: None,
125        }
126    }
127
128    /// Configure whether to coerce schemas to match expected schemas during query execution.
129    /// Remember, there is a non-zero cost incurred per `RecordBatch`, and this is mainly useful for
130    /// allowing looser return types when using clickhouse functions.
131    #[must_use]
132    pub fn with_coercion(mut self, coerce: bool) -> Self {
133        self.factory = self.factory.with_coercion(coerce);
134        self
135    }
136
137    /// Configure the write concurrency level for INSERT operations.
138    ///
139    /// This controls how many record batches can be written to `ClickHouse` concurrently during
140    /// INSERT operations. Higher values may improve throughput for large bulk inserts but will
141    /// increase memory usage and connection pool pressure.
142    ///
143    /// Default: 4 (matching clickhouse-arrow's current connection pool limit)
144    ///
145    /// # Note
146    /// This configuration can be extended to integrate with `DataFusion`'s `SessionConfig` using
147    /// the `ConfigExtension` trait for global defaults across multiple catalogs. The current
148    /// implementation uses a builder-level approach which serves most use cases efficiently.
149    #[must_use]
150    pub fn with_write_concurrency(mut self, concurrency: usize) -> Self {
151        self.write_concurrency = Some(concurrency);
152        self
153    }
154
155    /// Configure the underlying [`clickhouse_arrow::ClientBuilder`].
156    #[must_use]
157    pub fn configure_client(mut self, f: impl FnOnce(ClientBuilder) -> ClientBuilder) -> Self {
158        self.pool_builder = self.pool_builder.configure_client(f);
159        self
160    }
161
162    /// Configure the underlying [`clickhouse_arrow::ArrowPoolBuilder`].
163    #[must_use]
164    pub fn configure_pool(mut self, f: impl FnOnce(ArrowPoolBuilder) -> ArrowPoolBuilder) -> Self {
165        self.pool_builder = self.pool_builder.configure_pool(f);
166        self
167    }
168
169    /// `clickhouse_arrow` defaults to binary encoding for
170    /// [`datafusion::arrow::datatypes::DataType::Utf8`] columns, the default is inverted for
171    /// `DataFusion`. This method allows disabling that change to use binary encoding for strings,
172    /// among other configuration options.
173    ///
174    /// See: [`clickhouse_arrow::ArrowOptions`]
175    #[must_use]
176    pub fn configure_arrow_options(mut self, f: impl FnOnce(ArrowOptions) -> ArrowOptions) -> Self {
177        self.pool_builder = self.pool_builder.configure_client(|c| {
178            let options = c.options().ext.arrow.unwrap_or(default_arrow_options());
179            c.with_arrow_options(f(options))
180        });
181        self
182    }
183
184    /// Build ensures `ClickHouse` builtins are registered (such as nested functions), the pool is
185    /// attached to the factory, the `ClickHouse` endpoint is reachable, and the catalog is created
186    /// and registered to the [`SessionContext`].
187    ///
188    /// NOTE: The `pool_identifier` is used in the case when `federation` is enabled. It will
189    /// indicate if the underlying connection references the same connection or not.
190    ///
191    /// # Errors
192    /// - Returns an error if the `ClickHouse` endpoint is unreachable
193    /// - Returns an error if the `ClickHouse` catalog fails to be created
194    pub async fn build_catalog_from_pool(
195        ctx: &SessionContext,
196        endpoint: impl Into<String>,
197        catalog: Option<&str>,
198        pool_identifier: impl Into<String>,
199        pool: clickhouse_arrow::bb8::Pool<ArrowConnectionManager>,
200    ) -> Result<ClickHouseCatalogBuilder> {
201        // Register built in functions and clickhouse udfs
202        register_builtins(ctx);
203
204        let catalog = catalog.unwrap_or(DEFAULT_CLICKHOUSE_CATALOG).to_string();
205        debug!(catalog, "Attaching pool to ClickHouse table factory");
206
207        let endpoint = endpoint.into();
208        let factory = ClickHouseTableProviderFactory::new();
209        let pool = Arc::new(factory.attach_pool(&endpoint, pool_identifier, pool));
210
211        ClickHouseCatalogBuilder::try_new(ctx, catalog, "", endpoint, pool, factory).await
212    }
213
214    /// Build ensures `ClickHouse` builtins are registered (such as nested functions), the pool is
215    /// attached to the factory, the `ClickHouse` endpoint is reachable, and the catalog is created
216    /// and registered to the [`SessionContext`].
217    ///
218    /// # Errors
219    /// - Returns an error if the `ClickHouse` endpoint is unreachable
220    /// - Returns an error if the `ClickHouse` catalog fails to be created
221    pub async fn build_catalog(
222        self,
223        ctx: &SessionContext,
224        catalog: Option<&str>,
225    ) -> Result<ClickHouseCatalogBuilder> {
226        // Register built in functions and clickhouse udfs
227        register_builtins(ctx);
228
229        let catalog = catalog.unwrap_or(DEFAULT_CLICKHOUSE_CATALOG).to_string();
230        let database = self.pool_builder.client_options().default_database.clone();
231        debug!(catalog, database, "Attaching pool to ClickHouse table factory");
232
233        let endpoint = self.endpoint.to_string();
234        let factory = self.factory;
235        let mut pool = factory.attach_pool_builder(self.endpoint, self.pool_builder).await?;
236
237        // Configure write concurrency if specified
238        if let Some(concurrency) = self.write_concurrency {
239            pool = pool.with_write_concurrency(concurrency);
240        }
241        let pool = Arc::new(pool);
242
243        ClickHouseCatalogBuilder::try_new(ctx, catalog, database, endpoint, pool, factory).await
244    }
245}
246
247/// [`ClickHouseCatalogBuilder`] can be used to create tables, register existing tables, and finally
248/// refresh the `ClickHouse` catalog in `DataFusion`.
249///
250/// IMPORTANT! After creating tables, one of the build variations, ie `Self::build` or
251/// `Self::build_schema`, must be called to ensure the catalog provider is up to date with the
252/// remote `ClickHouse` database. If you forget to do this, `DataFusion` queries targeting one of
253/// these tables will fail.
254#[derive(Clone)]
255pub struct ClickHouseCatalogBuilder {
256    catalog:  String,
257    /// The current schema the builder is targeting.
258    schema:   String,
259    /// The configured remote endpoint the underlying `ClickHouse` pool is connected.
260    endpoint: String,
261    /// The `ClickHouse` connection used to communicate with the remote `ClickHouse` database.
262    pool:     Arc<ClickHouseConnectionPool>,
263    /// This factory is used to create new tables in the remote `ClickHouse` database. This builder
264    /// must be built with one of the builder variations, ie `Self::build` or `Self::build_schema`,
265    /// so that the provider reflects the most current remote schema.
266    factory:  ClickHouseTableProviderFactory,
267    /// The catalog provider is a passive catalog provider, meaning it must be "refreshed" after
268    /// table creation or schema change of any type to the remove `ClickHouse` database.
269    provider: Arc<ClickHouseCatalogProvider>,
270}
271
272impl ClickHouseCatalogBuilder {
273    // TODO: Docs
274    /// Internal constructor
275    async fn try_new(
276        ctx: &SessionContext,
277        catalog: impl Into<String>,
278        default_schema: impl Into<String>,
279        endpoint: impl Into<String>,
280        pool: Arc<ClickHouseConnectionPool>,
281        factory: ClickHouseTableProviderFactory,
282    ) -> Result<Self> {
283        let schema = default_schema.into();
284        let catalog = catalog.into();
285        let endpoint = endpoint.into();
286
287        // Must have a schema
288        let schema = if schema.is_empty() { "default".to_string() } else { schema };
289
290        // Ensure the database exists if not default
291        if schema != "default" {
292            debug!(schema, "Database not default, attempting create");
293            utils::create_database(&schema, &pool).await?;
294        }
295
296        // Register catalog or create a new one
297        let provider = if factory.coerce_schemas() {
298            ClickHouseCatalogProvider::try_new_with_coercion(Arc::clone(&pool)).await
299        } else {
300            ClickHouseCatalogProvider::try_new(Arc::clone(&pool)).await
301        }
302        .inspect_err(|error| error!(?error, "Failed to register catalog {catalog}"))?;
303
304        let provider = Arc::new(provider);
305
306        drop(
307            ctx.register_catalog(
308                catalog.as_str(),
309                Arc::clone(&provider) as Arc<dyn CatalogProvider>,
310            ),
311        );
312
313        Ok(ClickHouseCatalogBuilder { catalog, schema, endpoint, pool, factory, provider })
314    }
315
316    /// Return the name of the catalog in `DataFusion`'s context that this builder is configuring.
317    pub fn name(&self) -> &str { &self.catalog }
318
319    /// Return the currently set schema (database) being targeted. Can be changed on the fly by
320    /// calling `Self::with_schema`.
321    pub fn schema(&self) -> &str { &self.schema }
322
323    /// Update the current "schema" (database) that this builder is targeting, and continue
324    /// building.
325    ///
326    /// # Errors
327    /// - Returns an error if the schema needs to be created and fails
328    pub async fn with_schema(mut self, name: impl Into<String>) -> Result<Self> {
329        let name = name.into();
330        // Don't re-create if schema is already set
331        if name == self.schema {
332            return Ok(self);
333        }
334        self.schema = name;
335
336        // Ensure the database exists if not default
337        if self.schema != "default" {
338            debug!(schema = self.schema, "Database not default, attempting create");
339            utils::create_database(&self.schema, &self.pool).await?;
340        }
341
342        // Be sure to refresh the catalog before setting the schema
343        self.provider.refresh_catalog(&self.pool).await?;
344
345        Ok(self)
346    }
347
348    /// Create a new table in the remote `ClickHouse` instance.
349    ///
350    /// # Arguments
351    /// - `name`: The name of the table to create.
352    /// - `engine`: The engine to use for the table.
353    /// - `schema`: The schema of the table.
354    ///
355    /// # Returns
356    /// - A [`ClickHouseTableCreator`] that can be used to create tables in the remote `ClickHouse`
357    ///   instance.
358    pub fn with_new_table(
359        &self,
360        name: impl Into<String>,
361        engine: impl Into<ClickHouseEngine>,
362        schema: SchemaRef,
363    ) -> ClickHouseTableCreator {
364        let table = name.into();
365        let options = CreateOptions::new(engine.into().to_string());
366        debug!(schema = self.schema, table, ?options, "Initializing table creator");
367        ClickHouseTableCreator {
368            name: table,
369            builder: self.clone(),
370            options,
371            schema,
372            replace: false,
373        }
374    }
375
376    /// Create a new table in the remote `ClickHouse` instance.
377    ///
378    /// # Arguments
379    /// - `name`: The name of the table to create.
380    /// - `schema`: The schema of the table.
381    /// - `options`: More detailed `CreateOptions` for creating the provided table.
382    ///
383    /// # Returns
384    /// - A [`ClickHouseTableCreator`] that can be used to create tables in the remote `ClickHouse`
385    ///   instance.
386    pub fn with_new_table_and_options(
387        &self,
388        name: impl Into<String>,
389        schema: SchemaRef,
390        options: CreateOptions,
391    ) -> ClickHouseTableCreator {
392        let table = name.into();
393        debug!(schema = self.schema, table, ?options, "Initializing table creator");
394        ClickHouseTableCreator {
395            name: table,
396            builder: self.clone(),
397            options,
398            schema,
399            replace: false,
400        }
401    }
402
403    /// Register an existing `ClickHouse` table, optionally renaming it in the provided session
404    /// state.
405    ///
406    /// # Errors
407    /// - Returns an error if the table does not exist in the remote database
408    /// - Returns an error if the table cannot be registered to the context
409    pub async fn register_existing_table(
410        &self,
411        name: impl Into<TableReference>,
412        name_as: Option<impl Into<TableReference>>,
413        ctx: &SessionContext,
414    ) -> Result<()> {
415        let name = name.into();
416        let database = name.schema().unwrap_or(&self.schema);
417        let exists =
418            self.pool.connect().await?.tables(database).await?.contains(&name.table().to_string());
419
420        if !exists {
421            return Err(DataFusionError::Plan(format!(
422                "Table '{name}' does not exist in ClickHouse database '{database}', use \
423                 `table_creator` instead"
424            )));
425        }
426
427        let table = TableReference::full(self.catalog.as_str(), database, name.table());
428        let table_as = name_as.map(Into::into).unwrap_or(table.clone());
429
430        let factory = ClickHouseTableFactory::new(Arc::clone(&self.pool));
431        let provider = factory.table_provider(table).await?;
432        debug!(?table_as, "Registering ClickHouse table provider");
433        drop(ctx.register_table(table_as, provider)?);
434
435        Ok(())
436    }
437
438    /// Build the current `schema` (database) being managed by this catalog, optionally registering
439    /// a new schema to continue building
440    ///
441    /// Note: For the `SessionContext` to recognize the added tables and updated schema, either this
442    /// function or `Self::build` must be called.
443    ///
444    /// # Errors
445    /// - Returnes an error if an error occurs while refreshing the catalog
446    pub async fn build_schema(
447        mut self,
448        new_schema: Option<String>,
449        ctx: &SessionContext,
450    ) -> Result<Self> {
451        let _catalog = self.build_internal(ctx).await?;
452        self.schema = new_schema.unwrap_or(self.schema);
453        Ok(self)
454    }
455
456    /// Re-register the catalog, updating the [`SessionContext`], and return the updated context.
457    ///
458    /// Note: Important! For the [`SessionContext`] to recognize the added tables and updated
459    /// schema, either this function or `Self::build` must be called. For that reason, it is
460    /// important to  use the [`SessionContext`] provided back from this function.
461    ///
462    /// # Errors
463    /// - Returns an error if the `SessionContext` has not been federated
464    /// - Returnes an error if an error occurs while refreshing the catalog
465    /// - Returns an error if the "federation" feature is enabled but the context is not federated
466    pub async fn build(&self, ctx: &SessionContext) -> Result<Arc<ClickHouseCatalogProvider>> {
467        #[cfg(feature = "federation")]
468        {
469            use datafusion::common::exec_err;
470
471            use crate::federation::FederatedContext as _;
472
473            if !ctx.is_federated() {
474                return exec_err!(
475                    "Building this schema with federation enabled but no federated SessionContext \
476                     will fail. Call `ctx.federate()` before providing a context to build with."
477                );
478            }
479        }
480
481        self.build_internal(ctx).await
482    }
483
484    /// Re-registers the catalog provider, re-configures the [`SessionContext`], and return a
485    /// clone of the catalog.
486    ///
487    /// # Errors
488    /// - Returnes an error if an error occurs while refreshing the catalog
489    async fn build_internal(&self, ctx: &SessionContext) -> Result<Arc<ClickHouseCatalogProvider>> {
490        let catalog = Arc::clone(&self.provider);
491        debug!(catalog = self.catalog, "ClickHouse catalog created");
492
493        // Re-register UDFs
494        register_builtins(ctx);
495
496        // Re-register catalog
497        catalog.refresh_catalog(&self.pool).await?;
498        drop(ctx.register_catalog(&self.catalog, Arc::clone(&catalog) as Arc<dyn CatalogProvider>));
499
500        Ok(catalog)
501    }
502}
503
504/// Builder phase for creating `ClickHouse` tables.
505#[derive(Clone)]
506pub struct ClickHouseTableCreator {
507    name:    String,
508    builder: ClickHouseCatalogBuilder,
509    schema:  SchemaRef,
510    options: CreateOptions,
511    /// Whether the create external table command will replace existing table
512    replace: bool,
513}
514
515impl ClickHouseTableCreator {
516    /// Update the underlying table create options that will be passed to clickhouse.
517    ///
518    /// See [`CreateOptions`] for more information.
519    #[must_use]
520    pub fn update_create_options(
521        mut self,
522        update: impl Fn(CreateOptions) -> CreateOptions,
523    ) -> Self {
524        self.options = update(self.options);
525        self
526    }
527
528    /// Set whether `DataFusion` will replace an existing table if it already exists.
529    #[must_use]
530    pub fn set_or_replace(mut self, replace: bool) -> Self {
531        self.replace = replace;
532        self
533    }
534
535    /// Create the table, returning back a [`ClickHouseCatalogBuilder`] to create more tables.
536    ///
537    /// # Errors
538    /// - Returns an error if the `TableProviderFactory` fails to create the table
539    /// - Returnes an error if an error occurs while refreshing the catalog
540    pub async fn create(self, ctx: &SessionContext) -> Result<ClickHouseCatalogBuilder> {
541        let schema = self.builder.schema.clone();
542        let table = self.name;
543
544        let column_defaults = self
545            .options
546            .clone()
547            .defaults
548            .unwrap_or_default()
549            .into_iter()
550            .map(|(col, value)| (col, default_str_to_expr(&value)))
551            .collect::<HashMap<_, _>>();
552
553        let mut options = utils::create_options_to_params(self.options).into_params();
554        drop(options.insert(ENDPOINT_PARAM.into(), self.builder.endpoint.clone()));
555
556        let table_ref = TableReference::partial(schema.as_str(), table.as_str());
557        let cmd = CreateExternalTable {
558            name: table_ref.clone(),
559            schema: Arc::new(DFSchema::try_from(Arc::clone(&self.schema))?),
560            options,
561            column_defaults,
562            or_replace: self.replace,
563            constraints: Constraints::default(),
564            table_partition_cols: vec![],
565            if_not_exists: false,
566            location: String::new(),
567            file_type: String::new(),
568            temporary: false,
569            definition: None,
570            order_exprs: vec![],
571            unbounded: false,
572        };
573        let _provider = self
574            .builder
575            .factory
576            .create(&ctx.state(), &cmd)
577            .await
578            .inspect_err(|error| error!(?error, table, "Factory error creating table"))?;
579        debug!(table, "Table created, catalog will be refreshed in `build`");
580
581        // Refresh the catalog after creating the table
582        drop(self.builder.build_internal(ctx).await?);
583
584        Ok(self.builder)
585    }
586}