1use std::collections::BTreeMap;
36use std::hash::{BuildHasher, Hash, RandomState};
37use std::sync::atomic::{AtomicBool, Ordering};
38use std::sync::{Arc, LazyLock};
39use std::time::Duration;
40
41use async_trait::async_trait;
42use clickhouse_datafusion::prelude::clickhouse_arrow::{
43 ArrowClient, ArrowFormat, ArrowOptions, ClientBuilder, CompressionMethod, ConnectionManager,
44 ConnectionPool, bb8,
45};
46use clickhouse_datafusion::{ClickHouseBuilder, ClickHouseSessionContext};
47use datafusion::error::DataFusionError;
48use datafusion::execution::context::SessionContext;
49use datafusion::prelude::{DataFrame, SessionConfig};
50use serde::{Deserialize, Serialize};
51use tracing::debug;
52
53use super::{ConnectionOptions, PoolOptions, Secret};
54use crate::backend::{Backend, BackendMetadata, Capability, ConnectionKind, ConnectionMetadata};
55use crate::context::{DEFAULT_SESSION_CAPABILITIES, QuerySession, SessionCapability};
56use crate::error::{Error, Result};
57use crate::response::{ListSummary, TableSummary};
58
59static CLICKHOUSE_METADATA: LazyLock<BackendMetadata> = LazyLock::new(|| BackendMetadata {
60 kind: ConnectionKind::Database,
61 capabilities: vec![Capability::ExecuteSql, Capability::List],
62});
63
64pub const CLICKHOUSE_CATALOG: &str = "clickhouse";
65
66#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, Deserialize, Serialize, utoipa::ToSchema)]
68pub struct ClickHouseConfig {
69 #[serde(default)]
70 pub settings: BTreeMap<String, String>,
71 #[serde(skip_serializing_if = "Option::is_none")]
72 pub compression: Option<ClickHouseCompression>,
73}
74
75#[derive(
77 Debug, Default, Clone, Copy, PartialEq, Eq, Hash, Deserialize, Serialize, utoipa::ToSchema,
78)]
79#[serde(rename_all = "snake_case")]
80pub enum ClickHouseCompression {
81 None,
82 #[default]
83 Lz4,
84 Zstd,
85}
86
87#[derive(Clone)]
91pub struct QuerySessionContext(ClickHouseSessionContext);
92
93impl QuerySessionContext {
94 pub fn new() -> Self {
102 let config = SessionConfig::default()
103 .with_information_schema(true)
104 .set_bool("datafusion.execution.parquet.binary_as_string", true);
106 let session = SessionContext::new_with_config(config);
107 Self(
108 ClickHouseSessionContext::from(session)
109 .with_session_transform(SessionContext::enable_url_table),
110 )
111 }
112}
113
114impl Default for QuerySessionContext {
115 fn default() -> Self { Self::new() }
116}
117
118#[async_trait]
119impl QuerySession for QuerySessionContext {
120 fn as_session(&self) -> &SessionContext { self.0.session_context() }
121
122 fn capabilities(&self) -> &[SessionCapability] { DEFAULT_SESSION_CAPABILITIES }
123
124 async fn sql(&self, sql: &str) -> Result<DataFrame> {
125 self.0.sql(sql).await.map_err(Error::DataFusion)
126 }
127}
128
129pub struct ClickHouseBackend {
131 key: String,
132 metadata: ConnectionMetadata,
133 endpoint: String,
134 pool: ConnectionPool<ArrowFormat>,
137 registered: Arc<AtomicBool>,
138}
139
140impl ClickHouseBackend {
141 pub async fn try_new(
146 id: impl Into<String>,
147 name: impl Into<String>,
148 options: &ConnectionOptions,
149 config: Option<ClickHouseConfig>,
150 connect: PoolOptions,
151 ) -> Result<Self> {
152 let metadata = ConnectionMetadata {
153 id: id.into(),
154 name: name.into(),
155 catalog: Some(CLICKHOUSE_CATALOG.to_string()),
156 metadata: CLICKHOUSE_METADATA.clone(),
157 };
158
159 let key = RandomState::new().hash_one(&metadata).to_string();
160 let endpoint = options.endpoint.clone();
161 let config = config.unwrap_or_default();
162 let builder = create_client_builder("", options, &config);
163 let manager = ConnectionManager::try_new_with_builder(builder)
164 .await
165 .map_err(Error::ClickHouseArrow)?;
166 let pool_size = connect.pool_size.unwrap_or(1);
167 let timeout = connect.connect_timeout.map_or(30, u64::from);
168 let pool = bb8::Builder::new()
169 .max_size(pool_size)
170 .min_idle(pool_size)
171 .test_on_check_out(true)
172 .max_lifetime(Duration::from_secs(60 * 60 * 2))
173 .idle_timeout(Duration::from_secs(60 * 60 * 2))
174 .connection_timeout(Duration::from_secs(timeout))
175 .retry_connection(false)
176 .queue_strategy(bb8::QueueStrategy::Fifo)
177 .build(manager)
178 .await
179 .map_err(|e| DataFusionError::External(e.into()))
180 .map_err(Error::ClickHouseDatafusion)?;
181
182 Ok(Self { key, metadata, endpoint, pool, registered: Arc::new(AtomicBool::new(false)) })
183 }
184
185 pub fn metadata() -> BackendMetadata { CLICKHOUSE_METADATA.clone() }
186}
187
188#[async_trait]
189impl Backend for ClickHouseBackend {
190 fn connection(&self) -> &ConnectionMetadata { &self.metadata }
191
192 async fn prepare_session(&self, session: &SessionContext) -> Result<()> {
193 if self.registered.load(Ordering::Acquire) {
194 debug!("ClickHouse already registered, skipping registration");
195 return Ok(());
196 }
197
198 let pool = self.pool.clone();
199 let _clickhouse = ClickHouseBuilder::build_catalog_from_pool(
200 session,
201 &self.endpoint,
202 Some(CLICKHOUSE_CATALOG),
203 &self.key,
204 pool,
205 )
206 .await
207 .map_err(Error::ClickHouseDatafusion)?
208 .build(session)
209 .await
210 .map_err(Error::ClickHouseDatafusion)?;
211
212 self.registered.store(true, Ordering::Release);
214
215 Ok(())
216 }
217
218 async fn list(&self, database: Option<&str>) -> Result<ListSummary> {
219 if database.is_some_and(|d| !d.is_empty()) {
220 self.pool
221 .get()
222 .await
223 .map_err(|e| DataFusionError::External(e.into()))
224 .map_err(Error::ClickHouseDatafusion)?
225 .fetch_tables(database, None)
226 .await
227 .map_err(Error::from)
228 .map(|name| {
229 name.into_iter()
230 .map(|name| TableSummary { name, rows: None, size_bytes: None })
231 .collect()
232 })
233 .map(ListSummary::Tables)
234 .inspect(|result| {
235 tracing::debug!("------> ClickHouse list w/ database: {result:?}");
236 })
237 } else {
238 self.pool
239 .get()
240 .await
241 .map_err(|e| DataFusionError::External(e.into()))
242 .map_err(Error::ClickHouseDatafusion)?
243 .fetch_schemas(None)
244 .await
245 .map_err(Error::from)
246 .map(ListSummary::Databases)
247 .inspect(|result| {
248 tracing::debug!("------> ClickHouse list w/o database: {result:?}");
249 })
250 }
251 }
252}
253
254pub fn create_client_builder(
256 catalog: &str,
257 options: &ConnectionOptions,
258 config: &ClickHouseConfig,
259) -> ClientBuilder {
260 let catalog = if catalog.is_empty() { "default" } else { catalog };
261 ArrowClient::builder()
262 .with_database(catalog)
263 .with_endpoint(&options.endpoint)
264 .with_username(&options.username)
265 .with_password(options.password.as_ref().map(Secret::get).unwrap_or_default())
266 .with_settings(config.settings.clone())
267 .with_compression(match config.compression.unwrap_or_default() {
268 ClickHouseCompression::None => CompressionMethod::None,
269 ClickHouseCompression::Lz4 => CompressionMethod::LZ4,
270 ClickHouseCompression::Zstd => CompressionMethod::ZSTD,
271 })
272 .with_tls(options.tls.as_ref().is_some_and(|tls| tls.enable))
273 .with_arrow_options(ArrowOptions::default().with_strings_as_strings(true))
275}