1use std::fmt;
2use std::ops::{Deref, DerefMut};
3use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use clickhouse::Client;
8use deadpool::managed::{Manager, Metrics, Object, PoolError, RecycleError};
9use thiserror::Error;
10use tokio::task;
11use tokio::time::timeout;
12
13use crate::config::{ClickhouseConfig, DatalakeConfig};
14use crate::metrics::{Kind, MetricConfig, Registry, SharedRegistrar};
15
16#[derive(Debug, Error)]
17pub enum ClickhouseError {
18 #[error("Clickhouse client error: {0}")]
19 Client(#[from] clickhouse::error::Error),
20
21 #[error("Connection validation failed: {0}")]
22 Validation(String),
23
24 #[error("Connection timed out")]
25 Timeout,
26
27 #[error("Pool error: {0}")]
28 Pool(String),
29
30 #[error("Shutdown in progress")]
31 ShuttingDown,
32
33 #[error("Batch insertion error: {0}")]
34 BatchInsertionError(String),
35}
36
37impl From<tokio::time::error::Elapsed> for ClickhouseError {
38 fn from(_: tokio::time::error::Elapsed) -> Self {
39 Self::Timeout
40 }
41}
42
43impl<T: std::fmt::Display> From<PoolError<T>> for ClickhouseError {
44 fn from(value: PoolError<T>) -> Self {
45 Self::Pool(value.to_string())
46 }
47}
48
49#[derive(Debug, Clone)]
50pub struct PoolMetrics {
51 pub size: usize,
52 pub available: usize,
53 pub in_use: usize,
54 pub max_size: usize,
55 pub min_size: usize,
56 pub waiters: usize,
57}
58
59pub struct ClickhouseConnection {
60 client: Client,
61 last_used: Instant,
62 id: u64,
63 query_count: AtomicU64,
64 created_at: Instant,
65}
66
67impl ClickhouseConnection {
68 pub fn new(client: Client, id: u64) -> Self {
69 Self {
70 client,
71 last_used: Instant::now(),
72 id,
73 query_count: AtomicU64::new(0),
74 created_at: Instant::now(),
75 }
76 }
77
78 pub fn id(&self) -> u64 {
79 self.id
80 }
81
82 pub fn age(&self) -> Duration {
83 self.created_at.elapsed()
84 }
85
86 pub fn idle_time(&self) -> Duration {
87 self.last_used.elapsed()
88 }
89
90 pub fn query_count(&self) -> u64 {
91 self.query_count.load(Ordering::Relaxed)
92 }
93
94 pub async fn health_check(&self) -> Result<(), ClickhouseError> {
95 match self.client.query("SELECT 1").execute().await {
96 Ok(_) => Ok(()),
97 Err(e) => Err(ClickhouseError::Client(e)),
98 }
99 }
100}
101
102impl Deref for ClickhouseConnection {
103 type Target = Client;
104
105 fn deref(&self) -> &Self::Target {
106 &self.client
107 }
108}
109
110impl DerefMut for ClickhouseConnection {
111 fn deref_mut(&mut self) -> &mut Self::Target {
112 &mut self.client
113 }
114}
115
116impl fmt::Debug for ClickhouseConnection {
117 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118 f.debug_struct("ClickhouseConnection")
119 .field("id", &self.id)
120 .field("created_at", &self.created_at)
121 .field("query_count", &self.query_count)
122 .field("last_used", &self.last_used)
123 .finish()
124 }
125}
126
127pub fn get_query_type(query: &str) -> &'static str {
128 let query = query.trim_start().to_uppercase();
129
130 if query.starts_with("SELECT") {
131 "select"
132 } else if query.starts_with("INSERT") {
133 "insert"
134 } else if query.starts_with("CREATE") {
135 "create"
136 } else if query.starts_with("ALTER") {
137 "alter"
138 } else if query.starts_with("DROP") {
139 "drop"
140 } else {
141 "other"
142 }
143}
144
145#[derive(Debug)]
146pub struct ClickhouseConnectionManager {
147 config: Arc<ClickhouseConfig>,
148 next_connection_id: AtomicU64,
149 is_shutting_down: Arc<AtomicBool>,
150 metrics: Option<SharedRegistrar>,
151}
152
153impl ClickhouseConnectionManager {
154 pub fn new(config: Arc<ClickhouseConfig>, metrics: Option<SharedRegistrar>) -> Self {
155 Self {
156 config,
157 next_connection_id: AtomicU64::new(1),
158 is_shutting_down: Arc::new(AtomicBool::new(false)),
159 metrics,
160 }
161 }
162
163 pub fn initiate_shutdown(&self) {
164 self.is_shutting_down.store(true, Ordering::SeqCst);
165 log::info!("Clickhouse connection manager shutdown in progress");
166 }
167
168 pub fn create_client(&self) -> Result<Client, ClickhouseError> {
169 let url = self.config.authenticated_connection_url();
170
171 let client = Client::default()
172 .with_url(url)
173 .with_user(&self.config.username)
174 .with_password(&self.config.password)
175 .with_option("async_insert", "1")
176 .with_option("wait_for_async_insert", "1");
177
178 Ok(client)
179 }
180}
181
182impl Manager for ClickhouseConnectionManager {
183 type Type = ClickhouseConnection;
184 type Error = ClickhouseError;
185
186 async fn create(&self) -> Result<Self::Type, Self::Error> {
187 if self.is_shutting_down.load(Ordering::SeqCst) {
188 return Err(ClickhouseError::ShuttingDown);
189 }
190
191 let connection_id = self.next_connection_id.fetch_add(1, Ordering::SeqCst);
192
193 let start = Instant::now();
194
195 let config = &self.config.clone();
196 log::debug!(
197 "Creating new Clickhouse connection [id: {}] to: {}:{}",
198 connection_id,
199 config.host,
200 config.port
201 );
202
203 let client = self.create_client()?;
204
205 let validation_timeout = Duration::from_secs(config.connect_timeout_seconds);
206
207 let validation = match timeout(validation_timeout, client.query("SELECT 1").execute()).await
208 {
209 Ok(Ok(_)) => Ok(()),
210 Ok(Err(e)) => Err(ClickhouseError::Client(e)),
211 Err(_) => Err(ClickhouseError::Timeout),
212 };
213
214 let duration = start.elapsed();
215 if let Some(metrics) = &self.metrics {
216 metrics.set_gauge_vec_mut(
217 "clickhouse_connection_creation_second",
218 &["create"],
219 duration.as_secs_f64(),
220 );
221 }
222
223 match validation {
224 Ok(()) => {
225 log::debug!(
226 "Connection established: [id: {}] in {:?}",
227 connection_id,
228 duration
229 );
230
231 if let Some(metrics) = &self.metrics {
232 metrics.inc_int_counter_vec_mut(
233 "clickhouse_connections_created_total",
234 &["success"],
235 );
236 }
237
238 Ok(ClickhouseConnection::new(client, connection_id))
239 }
240 Err(e) => {
241 log::error!(
242 "Failed to validate ClickHouse connection (id: {}): {}",
243 connection_id,
244 e
245 );
246
247 if let Some(metrics) = &self.metrics {
248 metrics.inc_int_counter_vec_mut(
249 "clickhouse_connections_created_total",
250 &["failure"],
251 );
252 }
253
254 Err(e)
255 }
256 }
257 }
258
259 async fn recycle(
260 &self,
261 conn: &mut Self::Type,
262 _: &Metrics,
263 ) -> Result<(), RecycleError<Self::Error>> {
264 if self.is_shutting_down.load(Ordering::SeqCst) {
265 return Err(RecycleError::Message("Shutting down".into()));
266 }
267
268 log::debug!("Testing health of connection: [id: {}]", conn.id());
269
270 let validation_timeout = Duration::from_secs(self.config.connect_timeout_seconds);
271
272 match timeout(validation_timeout, conn.query("SELECT 1").execute()).await {
273 Ok(Ok(_)) => {
274 log::debug!("Connection [id: {}] health check passed", conn.id());
275
276 if let Some(metrics) = &self.metrics {
277 metrics.inc_int_counter_vec_mut(
278 "clickhouse_connections_health_checks_total",
279 &["success"],
280 );
281 }
282
283 Ok(())
284 }
285 Ok(Err(e)) => {
286 log::warn!("Connection [id: {}] health check failed: {}", conn.id(), e);
287
288 if let Some(metrics) = &self.metrics {
289 metrics.inc_int_counter_vec_mut(
290 "clickhouse_connections_health_checks_total",
291 &["failure"],
292 );
293 }
294
295 Err(RecycleError::Message(
296 format!("Health check failed: {}", e).into(),
297 ))
298 }
299 Err(_) => {
300 log::warn!(
301 "Connection [id: {}] health check timed out after: {:?}",
302 conn.id(),
303 validation_timeout
304 );
305
306 if let Some(metrics) = &self.metrics {
307 metrics.inc_int_counter_vec_mut(
308 "clickhouse_connections_health_checks_total",
309 &["timeout"],
310 );
311 }
312
313 Err(RecycleError::Message("Health check timed out".into()))
314 }
315 }
316 }
317}
318
319pub type Pool = deadpool::managed::Pool<ClickhouseConnectionManager>;
320pub type PooledConnection = Object<ClickhouseConnectionManager>;
321
322pub struct ClickhouseConnectionPool {
323 pool: Pool,
324 config: Arc<DatalakeConfig>,
325 metrics: Option<SharedRegistrar>,
326 is_initialized: AtomicBool,
327}
328
329impl ClickhouseConnectionPool {
330 pub fn new(config: Arc<DatalakeConfig>, metrics: Option<SharedRegistrar>) -> Self {
331 if let Some(metrics_ref) = &metrics {
332 Self::register_metrics(metrics_ref);
333 }
334
335 let initial_size = config.clickhouse.max_connections as usize;
336
337 let manager = ClickhouseConnectionManager::new(config.clickhouse.clone(), metrics.clone());
338
339 let pool = deadpool::managed::Pool::<ClickhouseConnectionManager>::builder(manager)
340 .max_size(initial_size)
341 .build()
342 .expect("Failed to build connection pool");
343
344 Self {
345 pool,
346 config,
347 metrics,
348 is_initialized: AtomicBool::new(false),
349 }
350 }
351
352 pub async fn initialize(&self) -> Result<(), ClickhouseError> {
353 if self.is_initialized.load(Ordering::SeqCst) {
354 return Ok(());
355 }
356
357 log::info!("Initializing Clickhouse connection pool");
358
359 let warmup_count = self.config.clickhouse.max_connections as usize;
360
361 let mut warmup_handles = Vec::with_capacity(warmup_count);
362
363 for i in 0..warmup_count {
364 let pool = self.pool.clone();
365
366 let handle = task::spawn(async move {
367 match pool.get().await {
368 Ok(conn) => match conn.health_check().await {
369 Ok(_) => {
370 log::debug!("Warm-up connection {} initialized successfully", i);
371 Ok(())
372 }
373 Err(e) => {
374 log::error!("Warm-up connection {} health check failed: {}", i, e);
375 Err(e)
376 }
377 },
378 Err(e) => {
379 log::error!("Failed to get warm-up connection {}: {}", i, e);
380 Err(ClickhouseError::Pool(e.to_string()))
381 }
382 }
383 });
384 warmup_handles.push(handle);
385 }
386
387 let mut warmup_success_count = 0;
388 for (i, handle) in warmup_handles.into_iter().enumerate() {
389 match handle.await {
390 Ok(Ok(_)) => {
391 warmup_success_count += 1;
392 }
393 Ok(Err(e)) => {
394 log::warn!("Warm-up connection {} failed: {}", i, e);
395 }
396 Err(e) => {
397 log::error!("Warm-up task {} panicked: {}", i, e);
398 }
399 }
400 }
401
402 log::info!(
403 "Connection pool warm-up complete: {}/{} successful",
404 warmup_success_count,
405 warmup_count
406 );
407
408 self.is_initialized.store(true, Ordering::SeqCst);
409
410 if let Some(metrics) = &self.metrics {
411 let status = self.pool.status();
412 metrics.set_int_gauge_vec_mut(
413 "clickhouse_pool_connections",
414 &["available"],
415 status.available as i64,
416 );
417 metrics.set_int_gauge_vec_mut(
418 "clickhouse_pool_connections",
419 &["size"],
420 status.size as i64,
421 );
422 }
423
424 Ok(())
425 }
426
427 pub async fn get_connection(&self) -> Result<PooledConnection, ClickhouseError> {
428 if !self.is_initialized.load(Ordering::SeqCst) {
429 log::warn!("Attempting to get connection from uninitialized pool");
430 }
431
432 let start = Instant::now();
433
434 let timeout_duration = Duration::from_secs(self.config.clickhouse.connect_timeout_seconds);
435
436 for attempt in 0..3 {
437 match tokio::time::timeout(timeout_duration, self.pool.get()).await {
438 Ok(Ok(conn)) => {
439 let duration = start.elapsed();
440
441 if let Some(metrics) = &self.metrics {
442 metrics.set_gauge_vec_mut(
443 "clickhouse_connection_acquisition_seconds",
444 &["success"],
445 duration.as_secs_f64(),
446 );
447 metrics.inc_int_counter_vec_mut(
448 "clickhouse_connection_acquisition_total",
449 &["success"],
450 );
451 }
452
453 log::debug!(
454 "Connection acquired in {:?} (attempt {})",
455 duration,
456 attempt + 1
457 );
458 return Ok(conn);
459 }
460 Ok(Err(e)) => {
461 if let Some(metrics) = &self.metrics {
462 metrics.inc_int_counter_vec_mut(
463 "clickhouse_connection_acquisition_total",
464 &["failure"],
465 );
466 }
467
468 log::warn!(
469 "Failed to get connection from pool (attempt {}): {}",
470 attempt + 1,
471 e
472 );
473
474 if attempt >= 2 {
475 return Err(ClickhouseError::Pool(e.to_string()));
476 }
477 }
478 Err(_) => {
479 if let Some(metrics) = &self.metrics {
480 metrics.inc_int_counter_vec_mut(
481 "clickhouse_connection_acquisition_total",
482 &["timeout"],
483 );
484 }
485
486 log::warn!(
487 "Timed out waiting for connection (attempt {}) after {:?}",
488 attempt + 1,
489 timeout_duration
490 );
491
492 if attempt >= 2 {
493 return Err(ClickhouseError::Timeout);
494 }
495 }
496 }
497 let backoff = Duration::from_millis(50 * 2u64.pow(attempt));
498 tokio::time::sleep(backoff).await;
499 }
500 Err(ClickhouseError::Pool(
501 "Failed to get connection after retries".to_string(),
502 ))
503 }
504
505 pub async fn shutdown(&self) -> Result<(), ClickhouseError> {
506 log::info!("Initiating graceful shutdown of ClickHouse connection pool");
507
508 let pool_manager = self.pool.manager();
509 pool_manager.initiate_shutdown();
510
511 let status = self.pool.status();
512 log::info!(
513 "Connection pool status before shutdown: size={}, available={}, in_use={}",
514 status.size,
515 status.available,
516 status.size - status.available
517 );
518
519 let drain_timeout = Duration::from_secs(30);
520 let drain_start = Instant::now();
521
522 loop {
523 let status = self.pool.status();
524 let in_use = status.size - status.available;
525
526 if in_use == 0 {
527 log::info!("All connections returned to pool, proceeding with shutdown");
528 break;
529 }
530
531 if drain_start.elapsed() > drain_timeout {
532 log::warn!(
533 "Shutdown drain timeout exceeded, {} connections still in use",
534 in_use
535 );
536
537 break;
538 }
539
540 log::info!("Waiting for {} connections to be returned to pool", in_use);
541 tokio::time::sleep(Duration::from_secs(1)).await;
542 }
543
544 self.pool.close();
546 log::info!("All connections closed");
547
548 log::info!("ClickHouse connection pool shutdown complete");
549
550 Ok(())
551 }
552
553 fn register_metrics(metrics: &SharedRegistrar) {
554 let metric_configs = [
555 MetricConfig {
557 kind: Kind::IntCounterVec,
558 name: "clickhouse_connections_created_total",
559 help: "Total no. of connections created",
560 label_names: &["status"],
561 },
562 MetricConfig {
563 kind: Kind::IntGaugeVec,
564 name: "clickhouse_pool_connections",
565 help: "Current no. of connections in the pool",
566 label_names: &["state"],
567 },
568 MetricConfig {
569 kind: Kind::IntCounterVec,
570 name: "clickhouse_connetion_health_checks_total",
571 help: "Total no. of connection health checks",
572 label_names: &["status"],
573 },
574 MetricConfig {
575 kind: Kind::GaugeVec,
576 name: "clickhouse_connection_creation_seconds",
577 help: "Time taken to create connections",
578 label_names: &["operation"],
579 },
580 MetricConfig {
582 kind: Kind::IntCounterVec,
583 name: "clickhouse_queries_total",
584 help: "Total no. of queries executed",
585 label_names: &["type"],
586 },
587 MetricConfig {
588 kind: Kind::IntCounterVec,
589 name: "clickhouse_query_errors_total",
590 help: "Total no. of query errors",
591 label_names: &["type"],
592 },
593 MetricConfig {
594 kind: Kind::GaugeVec,
595 name: "clickhouse_query_duration_seconds",
596 help: "Query execution time in seconds",
597 label_names: &["type"],
598 },
599 MetricConfig {
601 kind: Kind::IntCounterVec,
602 name: "clickhouse_batch_query_errors_total",
603 help: "Total number of batch query errors",
604 label_names: &["type"],
605 },
606 MetricConfig {
607 kind: Kind::GaugeVec,
608 name: "clickhouse_batch_query_duration_seconds",
609 help: "Batch query execution time in seconds",
610 label_names: &["type"],
611 },
612 MetricConfig {
614 kind: Kind::GaugeVec,
615 name: "clickhouse_connection_acquisition_seconds",
616 help: "Time taken to acquire a connection from the pool",
617 label_names: &["status"],
618 },
619 MetricConfig {
620 kind: Kind::IntCounterVec,
621 name: "clickhouse_connection_acquisition_total",
622 help: "Total number of connection acquisition attempts",
623 label_names: &["status"],
624 },
625 MetricConfig {
626 kind: Kind::IntCounterVec,
627 name: "clickhouse_connections_recycled_total",
628 help: "Total number of connections recycled",
629 label_names: &["reason"],
630 },
631 MetricConfig {
632 kind: Kind::GaugeVec,
633 name: "clickhouse_connection_recycling_seconds",
634 help: "Time taken for connection recycling",
635 label_names: &["operation"],
636 },
637 ];
638
639 metrics.with_metric_configs(&metric_configs).ok();
640 }
641
642 pub fn status(&self) -> PoolMetrics {
643 let status = self.pool.status();
644
645 PoolMetrics {
646 size: status.size,
647 available: status.available,
648 in_use: status.size - status.available,
649 max_size: status.max_size,
650 min_size: status.max_size,
651 waiters: status.waiting,
652 }
653 }
654}