Expand description
§🚇 ClickHouse DataFusion Integration
A high-performance Rust library that integrates ClickHouse with Apache DataFusion, enabling seamless querying across ClickHouse and other data sources.
Built on clickhouse-arrow for optimal performance and DataFusion for advanced SQL analytics.
§Why clickhouse-datafusion?
- 🚀 High Performance: Built on clickhouse-arrow for optimal data transfer and Arrow format efficiency
- ⚡ Connection Pooling: clickhouse-arrow provides connection pooling for scalability
- 🔗 Federation Support: Join
ClickHousetables with otherDataFusionsources seamlessly - ⛓️ Two-tier Execution: Delegate complexity to
ClickHouseleveraging additional optimizations at the edge - 🛠️
ClickHouseUDFs: Direct access toClickHousefunctions inDataFusionSQL queries - 📊 Advanced Analytics: Support for window functions, CTEs, subqueries, and complex JOINs
- 🎯 Arrow Native: Native Apache Arrow integration for zero-copy data processing
- 🔄 Schema Flexibility: Optional schema coercion after fetching data for automatic type compatibility
§Quick Start
Add to your Cargo.toml:
[dependencies]
clickhouse-datafusion = "0.1.5"§Basic Usage
use clickhouse_arrow::prelude::ClickHouseEngine;
use clickhouse_datafusion::{ClickHouseBuilder, ClickHouseSessionContext};
use datafusion::prelude::SessionContext;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create DataFusion context with ClickHouse UDF support
let ctx = ClickHouseSessionContext::from(SessionContext::new());
// Build ClickHouse integration
let clickhouse = ClickHouseBuilder::new("http://localhost:9000")
.configure_client(|c| c.with_username("clickhouse"))
.configure_arrow_options(|opts| opts.with_strings_as_strings(true))
.build_catalog(&ctx, Some("clickhouse"))
.await?;
// Define schema for test table
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
// Set schema (database name)
let clickhouse = clickhouse.with_schema("my_database").await?;
// Register existing table
let clickhouse = clickhouse.register_existing_table("my_table").await?;
// Create a new table on the remote server and get the catalog builder back
let clickhouse = clickhouse
.with_new_table("new_table", ClickHouseEngine::MergeTree, schema)
.create(ctx)
.await?;
// Finally build the catalog so the changes take effect (in DataFusion)
let _catalog = clickhouse.build(&ctx).await?;
// Query ClickHouse tables
let df = ctx.sql("SELECT * FROM clickhouse.my_database.my_table LIMIT 10").await?;
datafusion::arrow::util::pretty::print_batches(&df.collect().await?)?;
Ok(())
}§Preserving custom SessionContext configuration
If you add bespoke configuration to a SessionContext before wrapping it (for example adding
custom catalogs or enabling SessionContext::enable_url_table so URLs can be queried directly),
use ClickHouseSessionContext::with_session_transform
to re-apply those changes after the ClickHouse-specific rebuild:
use datafusion::prelude::SessionContext;
use clickhouse_datafusion::ClickHouseSessionContext;
let base = SessionContext::new().enable_url_table();
let ctx = ClickHouseSessionContext::from(base)
.with_session_transform(|ctx| ctx.enable_url_table());This keeps your customisation intact while the library layers in its own query planner and UDFs.
§With Federation (Cross-DBMS Queries)
use clickhouse_arrow::prelude::ClickHouseEngine;
use clickhouse_datafusion::{ClickHouseBuilder, ClickHouseSessionContext};
use clickhouse_datafusion::federation::FederatedContext;
use datafusion::prelude::SessionContext;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Enable federation for cross-database queries
let ctx = SessionContext::new().federate();
// Optionally enable the full ClickHouseSessionContext with ClickHouseQueryPlanner
//
// NOTE: Not required, provides a custom QueryPlanner and Analyzer with optional federation.
let ctx = ClickHouseSessionContext::from(ctx);
// Configure the underlying connection and initialize the ClickHouse catalog
let clickhouse = ClickHouseBuilder::new("http://localhost:9000")
// The default database set on the client is used initially by the returned catalog builder.
.configure_client(|c| c.with_username("clickhouse").with_default_database("other_db"))
.build_catalog(&ctx, Some("clickhouse")) // default catalog name
.await?;
// Create a table on the remote server and build the catalog (so changes are registered in
// DataFusion).
//
// The 'clickhouse' catalog returned from the above builder can continue to be used to create as
// many tables as needed. But remember to always `build` the catalog when you want to interact
// with it via DataFusion queries, using either 'build' or 'build_schema'.
let _clickhouse_catalog_provider = clickhouse
// Change the default database from 'other_db' to 'analytics'
.with_schema("analytics")
.await?
// Create users table on the database 'analytics'
.with_new_table(
// Table name
"user_events",
// Engine - ClickHouseEngine::default() could be used as well.
ClickHouseEngine::MergeTree,
// Define the schema for user_events table
Arc::new(Schema::new(vec![
Field::new("user_id", DataType::Int32, false),
Field::new("event_count", DataType::UInt32, false),
])),
)
// Set `clickhouse_arrow::CreateOptions` for `user_events` table
.update_create_options(|opts| opts.with_order_by(&["id".into()]))
// Finally create the table
.create(ctx)
.await?
// And then "build" the catalog, synchronizing the remote schema with DataFusion
.build(&ctx)
.await?;
// Register other data sources (Parquet, CSV, etc.)
ctx.register_parquet("local_data", "data.parquet", ParquetReadOptions::default()).await?;
// Join across different data sources
let df = ctx.sql("
SELECT ch.user_id,
ch.event_count,
local.user_name
FROM clickhouse.analytics.user_events ch
JOIN local_data local
ON ch.user_id = local.user_id
WHERE ch.event_count > 100
").await?;
datafusion::arrow::util::pretty::print_batches(&df.collect().await?)?;
Ok(())
}§ClickHouse Functions
Access ClickHouse’s powerful functions directly in DataFusion SQL:
§Direct Function Calls
-- Mathematical functions
SELECT clickhouse(sigmoid(price), 'Float64') as price_sigmoid FROM products;
-- String functions
SELECT clickhouse(`base64Encode`(name), 'Utf8') as b64_name FROM users;
-- Array functions
SELECT clickhouse(`arrayJoin`(tags), 'Utf8') as individual_tags FROM articles;§Lambda Functions
-- Transform arrays with lambda functions
SELECT
names,
clickhouse(`arrayMap`($x, concat($x, '_processed'), names), 'List(Utf8)') as processed_names
FROM user_data;
-- Filter arrays
SELECT clickhouse(`arrayFilter`($x, length($x) > 3, tags), 'List(Utf8)') as long_tags
FROM content;§Complex Analytics
-- Window functions with ClickHouse functions
SELECT
user_id,
clickhouse(exp(revenue), 'Float64') as exp_revenue,
SUM(revenue) OVER (PARTITION BY user_id ORDER BY date) as running_total
FROM sales_data;
-- CTEs with ClickHouse functions
WITH processed_data AS (
SELECT
user_id,
clickhouse(`arrayJoin`(event_types), 'Utf8') as event_type
FROM user_events
)
SELECT event_type, COUNT(*) as event_count
FROM processed_data
GROUP BY event_type;§Architecture
§Core Components
ClickHouseBuilder: Main configuration entry pointClickHouseSessionContext: EnhancedDataFusioncontext withClickHouseUDF support- Table Providers:
DataFusionintegration layer forClickHousetables - Federation: Cross-database query support via datafusion-federation
- UDF System:
ClickHousefunction integration with intelligent pushdown - Function Analyzer: Advanced optimization for UDF placement
§Key Features
§Connection Pooling
let clickhouse = ClickHouseBuilder::new("http://localhost:9000")
.configure_pool(|pool| pool.max_size(10))
.configure_client(|client| client.with_compression(CompressionMethod::LZ4))
.build_catalog(&ctx, None)
.await?;§Schema Coercion
let builder = ClickHouseBuilder::new("http://localhost:9000")
.with_schema_coercion(true) // Enable automatic type coercion
.build_catalog(&ctx, None)
.await?;§Schema Management
// Create tables from Arrow schema
let schema = Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, false),
]);
clickhouse.with_new_table("users", "MergeTree".into(), schema).create(&ctx).await?;§Federation Support
When the federation feature is enabled, clickhouse-datafusion can join ClickHouse tables with other DataFusion sources:
Note: The current release uses
datafusion-federationv0.4.7 from crates.io for publishing compatibility. This version has a known issue withUNNESToperations due to an upstream DataFusion bug in expression handling. If you needUNNESTsupport in federated queries, please track PR #135 for the fix.
-- Join ClickHouse with Parquet files
SELECT
ch.user_id,
ch.total_purchases,
parquet.user_segment
FROM clickhouse.analytics.user_stats ch
JOIN local_parquet.user_segments parquet
ON ch.user_id = parquet.user_id
WHERE ch.total_purchases > 1000;
-- Federated aggregations
SELECT
segment,
AVG(clickhouse(log(total_purchases), 'Float64')) as avg_log_purchases
FROM (
SELECT
ch.user_id,
ch.total_purchases,
csv.segment
FROM clickhouse.sales.users ch
JOIN local_csv.segments csv ON ch.user_id = csv.user_id
)
GROUP BY segment;§Features
- default: Core functionality with
ClickHouseintegration - federation: Enable cross-database queries
- cloud:
ClickHouseCloud support - test-utils: Testing utilities for development
§Development
§Running Tests
# All tests with multiple feature combinations
just test
# Specific test types
just test-e2e # End-to-end tests
just test-federation # Federation tests
just test-unit # Unit tests only
# Coverage reports
just coverage # HTML report
just coverage-lcov # LCOV for CI§Environment Variables
RUST_LOG=debug- Enable debug loggingDISABLE_CLEANUP=true- Keep test containers runningDISABLE_CLEANUP_ON_ERROR=true- Keep containers on test failure
§Examples
See the examples directory for a complete working example:
- Basic Integration: Simple
ClickHousequerying - Federation: Cross-database joins
- UDF Usage:
ClickHousefunction examples - Schema Management: Table creation and management
§Contributing
Contributions are welcome! Please see CONTRIBUTING.md for guidelines.
§License
Licensed under the Apache License, Version 2.0. See LICENSE for details.
Re-exports§
pub use providers::*;
Modules§
- analyzer
- A
DataFusionAnalyzerRulethat identifies largest subtree of a plan to wrap with an extension node, and “pushes down”ClickHousefunctions when required - dialect
- A custom
UnparserDialectforClickHouse. - plan_
node - A
UserDefinedLogicalNodeCoreimplementation for wrapping largest sub-trees of aDataFusionlogical plan for execution onClickHousedirectly. - planner
- An
ExtensionPlannerimplementation for executingClickHouseFunctionNodes - prelude
- To simplify compatibility, crates for
clickhouse_arrow,datafusion, anddatafusion::arroware re-exported. - providers
- sql
ClickHouseSQLDataFusionTableProvider- stream
- udfs
- Various UDFs providing
DataFusion’s sql parsing with someClickHousespecific functionality. - utils
- Helpers for bridging between
ClickHouse,DataFusion, andclickhouse_arrow.
Structs§
- Click
House Builder - Entrypoint builder for
ClickHouseandDataFusionintegration. - Click
House Catalog Builder ClickHouseCatalogBuildercan be used to create tables, register existing tables, and finally refresh theClickHousecatalog inDataFusion.- Click
House Connection - A wrapper around
ArrowPoolConnectionthat provides additional functionality relevant forDataFusion. - Click
House Connection Pool - A wrapper around a
clickhouse_arrow::ConnectionPool<ArrowFormat> - Click
House Context Provider - Custom
ContextProvider - Click
House Data Sink datafusion::datasource::sink::DataSinkforClickHouse- Click
House Query Planner - A custom
QueryPlannerleveragingClickHouseExtensionPlanner - Click
House Session Context - Wrapper for
SessionContextwhich allows running arbitraryClickHousefunctions. - Click
House Table Creator - Builder phase for creating
ClickHousetables.
Constants§
- DEFAULT_
CLICKHOUSE_ CATALOG - The default
DataFusioncatalog name if no other name is provided.
Functions§
- configure_
analyzer_ rules - Given a
SessionState, configure the analyzer rules for theClickHousesession context. - default_
arrow_ options - Simple function to provide default
ArrowOptionsfit for common use - prepare_
session_ context - Convenience method (opinionated) for preparing a session context both with federation if the
feature is enabled as well as UDF pushdown support through the custom
Analyzer. It is called inClickHouseSessionContext::newandClickHouseSessionContext::fromwhen creating a newClickHouseSessionContext.
Type Aliases§
- Arrow
Pool - Arrow
Pool Connection - Type alias for a pooled connection to a
ClickHousedatabase.