stately_arrow/database/
clickhouse.rs

1//! `ClickHouse` database backend implementation.
2//!
3//! This module provides integration with [ClickHouse](https://clickhouse.com/) databases,
4//! enabling SQL query execution and schema introspection through the unified backend interface.
5//!
6//! # Features
7//!
8//! - Connection pooling with configurable pool sizes and timeouts
9//! - Arrow-native data transfer for efficient query results
10//! - Automatic catalog registration with `DataFusion`
11//! - Compression support (LZ4, ZSTD)
12//! - TLS configuration
13//!
14//! # Example
15//!
16//! ```rust,ignore
17//! use stately_arrow::database::{ConnectionOptions, PoolOptions, clickhouse::ClickHouseBackend};
18//!
19//! let options = ConnectionOptions {
20//!     endpoint: "http://localhost:8123".to_string(),
21//!     username: "default".to_string(),
22//!     password: None,
23//!     tls: None,
24//! };
25//!
26//! let backend = ClickHouseBackend::try_new(
27//!     "my-clickhouse",
28//!     "My ClickHouse",
29//!     &options,
30//!     None,
31//!     PoolOptions::default(),
32//! ).await?;
33//! ```
34
35use std::collections::BTreeMap;
36use std::hash::{BuildHasher, Hash, RandomState};
37use std::sync::atomic::{AtomicBool, Ordering};
38use std::sync::{Arc, LazyLock};
39use std::time::Duration;
40
41use async_trait::async_trait;
42use clickhouse_datafusion::prelude::clickhouse_arrow::{
43    ArrowClient, ArrowFormat, ArrowOptions, ClientBuilder, CompressionMethod, ConnectionManager,
44    ConnectionPool, bb8,
45};
46use clickhouse_datafusion::{ClickHouseBuilder, ClickHouseSessionContext};
47use datafusion::error::DataFusionError;
48use datafusion::execution::context::SessionContext;
49use datafusion::prelude::{DataFrame, SessionConfig};
50use serde::{Deserialize, Serialize};
51use tracing::debug;
52
53use super::{ConnectionOptions, PoolOptions, Secret};
54use crate::backend::{Backend, BackendMetadata, Capability, ConnectionKind, ConnectionMetadata};
55use crate::context::{DEFAULT_SESSION_CAPABILITIES, QuerySession, SessionCapability};
56use crate::error::{Error, Result};
57use crate::response::{ListSummary, TableSummary};
58
59static CLICKHOUSE_METADATA: LazyLock<BackendMetadata> = LazyLock::new(|| BackendMetadata {
60    kind:         ConnectionKind::Database,
61    capabilities: vec![Capability::ExecuteSql, Capability::List],
62});
63
64pub const CLICKHOUSE_CATALOG: &str = "clickhouse";
65
66/// Additional ClickHouse-specific configuration.
67#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, Deserialize, Serialize, utoipa::ToSchema)]
68pub struct ClickHouseConfig {
69    #[serde(default)]
70    pub settings:    BTreeMap<String, String>,
71    #[serde(skip_serializing_if = "Option::is_none")]
72    pub compression: Option<ClickHouseCompression>,
73}
74
75/// Compression options for `ClickHouse` tables.
76#[derive(
77    Debug, Default, Clone, Copy, PartialEq, Eq, Hash, Deserialize, Serialize, utoipa::ToSchema,
78)]
79#[serde(rename_all = "snake_case")]
80pub enum ClickHouseCompression {
81    None,
82    #[default]
83    Lz4,
84    Zstd,
85}
86
87/// Wrapper for `ClickHouseSessionContext` to allow implementing `QuerySession` with `ClickHouse`
88/// capabilities. This is needed in order to allow function pushdown and `ClickHouse` semantics
89/// during `DataFusion` query execution.
90#[derive(Clone)]
91pub struct QuerySessionContext(ClickHouseSessionContext);
92
93impl QuerySessionContext {
94    /// Create a new `QuerySessionContext` wrapped with `ClickHouse` capabilities.
95    ///
96    /// NOTE: The settings applied below are due to [arrow-js](https://github.com/apache/arrow-js)
97    ///       missing support for "View" types.
98    ///
99    /// Once a PR is merged that enables support, those settings can be removed.
100    /// The PR currently tracking this is [#311](https://github.com/apache/arrow-js/pull/311)
101    pub fn new() -> Self {
102        let config = SessionConfig::default()
103            .with_information_schema(true)
104            // Binary is not very useful to the UI
105            .set_bool("datafusion.execution.parquet.binary_as_string", true);
106        let session = SessionContext::new_with_config(config);
107        Self(
108            ClickHouseSessionContext::from(session)
109                .with_session_transform(SessionContext::enable_url_table),
110        )
111    }
112}
113
114impl Default for QuerySessionContext {
115    fn default() -> Self { Self::new() }
116}
117
118#[async_trait]
119impl QuerySession for QuerySessionContext {
120    fn as_session(&self) -> &SessionContext { self.0.session_context() }
121
122    fn capabilities(&self) -> &[SessionCapability] { DEFAULT_SESSION_CAPABILITIES }
123
124    async fn sql(&self, sql: &str) -> Result<DataFrame> {
125        self.0.sql(sql).await.map_err(Error::DataFusion)
126    }
127}
128
129/// Backend implementation for `ClickHouse` connectors.
130pub struct ClickHouseBackend {
131    key:        String,
132    metadata:   ConnectionMetadata,
133    endpoint:   String,
134    /// An `ArrowConnectionManager` is stored so that the underlying connection can be used while
135    /// at the same time allowing registration into `DataFusion`.
136    pool:       ConnectionPool<ArrowFormat>,
137    registered: Arc<AtomicBool>,
138}
139
140impl ClickHouseBackend {
141    /// Create a new `ClickHouseBackend`.
142    ///
143    /// # Errors
144    /// - Returns an error if the connection pool cannot be created.
145    pub async fn try_new(
146        id: impl Into<String>,
147        name: impl Into<String>,
148        options: &ConnectionOptions,
149        config: Option<ClickHouseConfig>,
150        connect: PoolOptions,
151    ) -> Result<Self> {
152        let metadata = ConnectionMetadata {
153            id:       id.into(),
154            name:     name.into(),
155            catalog:  Some(CLICKHOUSE_CATALOG.to_string()),
156            metadata: CLICKHOUSE_METADATA.clone(),
157        };
158
159        let key = RandomState::new().hash_one(&metadata).to_string();
160        let endpoint = options.endpoint.clone();
161        let config = config.unwrap_or_default();
162        let builder = create_client_builder("", options, &config);
163        let manager = ConnectionManager::try_new_with_builder(builder)
164            .await
165            .map_err(Error::ClickHouseArrow)?;
166        let pool_size = connect.pool_size.unwrap_or(1);
167        let timeout = connect.connect_timeout.map_or(30, u64::from);
168        let pool = bb8::Builder::new()
169            .max_size(pool_size)
170            .min_idle(pool_size)
171            .test_on_check_out(true)
172            .max_lifetime(Duration::from_secs(60 * 60 * 2))
173            .idle_timeout(Duration::from_secs(60 * 60 * 2))
174            .connection_timeout(Duration::from_secs(timeout))
175            .retry_connection(false)
176            .queue_strategy(bb8::QueueStrategy::Fifo)
177            .build(manager)
178            .await
179            .map_err(|e| DataFusionError::External(e.into()))
180            .map_err(Error::ClickHouseDatafusion)?;
181
182        Ok(Self { key, metadata, endpoint, pool, registered: Arc::new(AtomicBool::new(false)) })
183    }
184
185    pub fn metadata() -> BackendMetadata { CLICKHOUSE_METADATA.clone() }
186}
187
188#[async_trait]
189impl Backend for ClickHouseBackend {
190    fn connection(&self) -> &ConnectionMetadata { &self.metadata }
191
192    async fn prepare_session(&self, session: &SessionContext) -> Result<()> {
193        if self.registered.load(Ordering::Acquire) {
194            debug!("ClickHouse already registered, skipping registration");
195            return Ok(());
196        }
197
198        let pool = self.pool.clone();
199        let _clickhouse = ClickHouseBuilder::build_catalog_from_pool(
200            session,
201            &self.endpoint,
202            Some(CLICKHOUSE_CATALOG),
203            &self.key,
204            pool,
205        )
206        .await
207        .map_err(Error::ClickHouseDatafusion)?
208        .build(session)
209        .await
210        .map_err(Error::ClickHouseDatafusion)?;
211
212        // Flag that connection has been registered
213        self.registered.store(true, Ordering::Release);
214
215        Ok(())
216    }
217
218    async fn list(&self, database: Option<&str>) -> Result<ListSummary> {
219        if database.is_some_and(|d| !d.is_empty()) {
220            self.pool
221                .get()
222                .await
223                .map_err(|e| DataFusionError::External(e.into()))
224                .map_err(Error::ClickHouseDatafusion)?
225                .fetch_tables(database, None)
226                .await
227                .map_err(Error::from)
228                .map(|name| {
229                    name.into_iter()
230                        .map(|name| TableSummary { name, rows: None, size_bytes: None })
231                        .collect()
232                })
233                .map(ListSummary::Tables)
234                .inspect(|result| {
235                    tracing::debug!("------> ClickHouse list w/ database: {result:?}");
236                })
237        } else {
238            self.pool
239                .get()
240                .await
241                .map_err(|e| DataFusionError::External(e.into()))
242                .map_err(Error::ClickHouseDatafusion)?
243                .fetch_schemas(None)
244                .await
245                .map_err(Error::from)
246                .map(ListSummary::Databases)
247                .inspect(|result| {
248                    tracing::debug!("------> ClickHouse list w/o database: {result:?}");
249                })
250        }
251    }
252}
253
254/// Create a `ClickHouse` client builder, Arrow format.
255pub fn create_client_builder(
256    catalog: &str,
257    options: &ConnectionOptions,
258    config: &ClickHouseConfig,
259) -> ClientBuilder {
260    let catalog = if catalog.is_empty() { "default" } else { catalog };
261    ArrowClient::builder()
262        .with_database(catalog)
263        .with_endpoint(&options.endpoint)
264        .with_username(&options.username)
265        .with_password(options.password.as_ref().map(Secret::get).unwrap_or_default())
266        .with_settings(config.settings.clone())
267        .with_compression(match config.compression.unwrap_or_default() {
268            ClickHouseCompression::None => CompressionMethod::None,
269            ClickHouseCompression::Lz4 => CompressionMethod::LZ4,
270            ClickHouseCompression::Zstd => CompressionMethod::ZSTD,
271        })
272        .with_tls(options.tls.as_ref().is_some_and(|tls| tls.enable))
273        // Ensure strings are Utf8 encoded for UI.
274        .with_arrow_options(ArrowOptions::default().with_strings_as_strings(true))
275}