Skip to main content

clickhouse_connection_pool/
pool.rs

1use std::fmt;
2use std::ops::{Deref, DerefMut};
3use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use clickhouse::Client;
8use deadpool::managed::{Manager, Metrics, Object, PoolError, RecycleError};
9use thiserror::Error;
10use tokio::task;
11use tokio::time::timeout;
12
13use crate::config::{ClickhouseConfig, DatalakeConfig};
14use crate::metrics::{Kind, MetricConfig, Registry, SharedRegistrar};
15
16#[derive(Debug, Error)]
17pub enum ClickhouseError {
18    #[error("Clickhouse client error: {0}")]
19    Client(#[from] clickhouse::error::Error),
20
21    #[error("Connection validation failed: {0}")]
22    Validation(String),
23
24    #[error("Connection timed out")]
25    Timeout,
26
27    #[error("Pool error: {0}")]
28    Pool(String),
29
30    #[error("Shutdown in progress")]
31    ShuttingDown,
32
33    #[error("Batch insertion error: {0}")]
34    BatchInsertionError(String),
35}
36
37impl From<tokio::time::error::Elapsed> for ClickhouseError {
38    fn from(_: tokio::time::error::Elapsed) -> Self {
39        Self::Timeout
40    }
41}
42
43impl<T: std::fmt::Display> From<PoolError<T>> for ClickhouseError {
44    fn from(value: PoolError<T>) -> Self {
45        Self::Pool(value.to_string())
46    }
47}
48
49#[derive(Debug, Clone)]
50pub struct PoolMetrics {
51    pub size: usize,
52    pub available: usize,
53    pub in_use: usize,
54    pub max_size: usize,
55    pub min_size: usize,
56    pub waiters: usize,
57}
58
59pub struct ClickhouseConnection {
60    client: Client,
61    last_used: Instant,
62    id: u64,
63    query_count: AtomicU64,
64    created_at: Instant,
65}
66
67impl ClickhouseConnection {
68    pub fn new(client: Client, id: u64) -> Self {
69        Self {
70            client,
71            last_used: Instant::now(),
72            id,
73            query_count: AtomicU64::new(0),
74            created_at: Instant::now(),
75        }
76    }
77
78    pub fn id(&self) -> u64 {
79        self.id
80    }
81
82    pub fn age(&self) -> Duration {
83        self.created_at.elapsed()
84    }
85
86    pub fn idle_time(&self) -> Duration {
87        self.last_used.elapsed()
88    }
89
90    pub fn query_count(&self) -> u64 {
91        self.query_count.load(Ordering::Relaxed)
92    }
93
94    pub async fn health_check(&self) -> Result<(), ClickhouseError> {
95        match self.client.query("SELECT 1").execute().await {
96            Ok(_) => Ok(()),
97            Err(e) => Err(ClickhouseError::Client(e)),
98        }
99    }
100}
101
102impl Deref for ClickhouseConnection {
103    type Target = Client;
104
105    fn deref(&self) -> &Self::Target {
106        &self.client
107    }
108}
109
110impl DerefMut for ClickhouseConnection {
111    fn deref_mut(&mut self) -> &mut Self::Target {
112        &mut self.client
113    }
114}
115
116impl fmt::Debug for ClickhouseConnection {
117    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118        f.debug_struct("ClickhouseConnection")
119            .field("id", &self.id)
120            .field("created_at", &self.created_at)
121            .field("query_count", &self.query_count)
122            .field("last_used", &self.last_used)
123            .finish()
124    }
125}
126
127pub fn get_query_type(query: &str) -> &'static str {
128    let query = query.trim_start().to_uppercase();
129
130    if query.starts_with("SELECT") {
131        "select"
132    } else if query.starts_with("INSERT") {
133        "insert"
134    } else if query.starts_with("CREATE") {
135        "create"
136    } else if query.starts_with("ALTER") {
137        "alter"
138    } else if query.starts_with("DROP") {
139        "drop"
140    } else {
141        "other"
142    }
143}
144
145#[derive(Debug)]
146pub struct ClickhouseConnectionManager {
147    config: Arc<ClickhouseConfig>,
148    next_connection_id: AtomicU64,
149    is_shutting_down: Arc<AtomicBool>,
150    metrics: Option<SharedRegistrar>,
151}
152
153impl ClickhouseConnectionManager {
154    pub fn new(config: Arc<ClickhouseConfig>, metrics: Option<SharedRegistrar>) -> Self {
155        Self {
156            config,
157            next_connection_id: AtomicU64::new(1),
158            is_shutting_down: Arc::new(AtomicBool::new(false)),
159            metrics,
160        }
161    }
162
163    pub fn initiate_shutdown(&self) {
164        self.is_shutting_down.store(true, Ordering::SeqCst);
165        log::info!("Clickhouse connection manager shutdown in progress");
166    }
167
168    pub fn create_client(&self) -> Result<Client, ClickhouseError> {
169        let url = self.config.authenticated_connection_url();
170
171        let client = Client::default()
172            .with_url(url)
173            .with_user(&self.config.username)
174            .with_password(&self.config.password)
175            .with_option("async_insert", "1")
176            .with_option("wait_for_async_insert", "1");
177
178        Ok(client)
179    }
180}
181
182impl Manager for ClickhouseConnectionManager {
183    type Type = ClickhouseConnection;
184    type Error = ClickhouseError;
185
186    async fn create(&self) -> Result<Self::Type, Self::Error> {
187        if self.is_shutting_down.load(Ordering::SeqCst) {
188            return Err(ClickhouseError::ShuttingDown);
189        }
190
191        let connection_id = self.next_connection_id.fetch_add(1, Ordering::SeqCst);
192
193        let start = Instant::now();
194
195        let config = &self.config.clone();
196        log::debug!(
197            "Creating new Clickhouse connection [id: {}] to: {}:{}",
198            connection_id,
199            config.host,
200            config.port
201        );
202
203        let client = self.create_client()?;
204
205        let validation_timeout = Duration::from_secs(config.connect_timeout_seconds);
206
207        let validation = match timeout(validation_timeout, client.query("SELECT 1").execute()).await
208        {
209            Ok(Ok(_)) => Ok(()),
210            Ok(Err(e)) => Err(ClickhouseError::Client(e)),
211            Err(_) => Err(ClickhouseError::Timeout),
212        };
213
214        let duration = start.elapsed();
215        if let Some(metrics) = &self.metrics {
216            metrics.set_gauge_vec_mut(
217                "clickhouse_connection_creation_second",
218                &["create"],
219                duration.as_secs_f64(),
220            );
221        }
222
223        match validation {
224            Ok(()) => {
225                log::debug!(
226                    "Connection established: [id: {}] in {:?}",
227                    connection_id,
228                    duration
229                );
230
231                if let Some(metrics) = &self.metrics {
232                    metrics.inc_int_counter_vec_mut(
233                        "clickhouse_connections_created_total",
234                        &["success"],
235                    );
236                }
237
238                Ok(ClickhouseConnection::new(client, connection_id))
239            }
240            Err(e) => {
241                log::error!(
242                    "Failed to validate ClickHouse connection (id: {}): {}",
243                    connection_id,
244                    e
245                );
246
247                if let Some(metrics) = &self.metrics {
248                    metrics.inc_int_counter_vec_mut(
249                        "clickhouse_connections_created_total",
250                        &["failure"],
251                    );
252                }
253
254                Err(e)
255            }
256        }
257    }
258
259    async fn recycle(
260        &self,
261        conn: &mut Self::Type,
262        _: &Metrics,
263    ) -> Result<(), RecycleError<Self::Error>> {
264        if self.is_shutting_down.load(Ordering::SeqCst) {
265            return Err(RecycleError::Message("Shutting down".into()));
266        }
267
268        log::debug!("Testing health of connection: [id: {}]", conn.id());
269
270        let validation_timeout = Duration::from_secs(self.config.connect_timeout_seconds);
271
272        match timeout(validation_timeout, conn.query("SELECT 1").execute()).await {
273            Ok(Ok(_)) => {
274                log::debug!("Connection [id: {}] health check passed", conn.id());
275
276                if let Some(metrics) = &self.metrics {
277                    metrics.inc_int_counter_vec_mut(
278                        "clickhouse_connections_health_checks_total",
279                        &["success"],
280                    );
281                }
282
283                Ok(())
284            }
285            Ok(Err(e)) => {
286                log::warn!("Connection [id: {}] health check failed: {}", conn.id(), e);
287
288                if let Some(metrics) = &self.metrics {
289                    metrics.inc_int_counter_vec_mut(
290                        "clickhouse_connections_health_checks_total",
291                        &["failure"],
292                    );
293                }
294
295                Err(RecycleError::Message(
296                    format!("Health check failed: {}", e).into(),
297                ))
298            }
299            Err(_) => {
300                log::warn!(
301                    "Connection [id: {}] health check timed out after: {:?}",
302                    conn.id(),
303                    validation_timeout
304                );
305
306                if let Some(metrics) = &self.metrics {
307                    metrics.inc_int_counter_vec_mut(
308                        "clickhouse_connections_health_checks_total",
309                        &["timeout"],
310                    );
311                }
312
313                Err(RecycleError::Message("Health check timed out".into()))
314            }
315        }
316    }
317}
318
319pub type Pool = deadpool::managed::Pool<ClickhouseConnectionManager>;
320pub type PooledConnection = Object<ClickhouseConnectionManager>;
321
322pub struct ClickhouseConnectionPool {
323    pool: Pool,
324    config: Arc<DatalakeConfig>,
325    metrics: Option<SharedRegistrar>,
326    is_initialized: AtomicBool,
327}
328
329impl ClickhouseConnectionPool {
330    pub fn new(config: Arc<DatalakeConfig>, metrics: Option<SharedRegistrar>) -> Self {
331        if let Some(metrics_ref) = &metrics {
332            Self::register_metrics(metrics_ref);
333        }
334
335        let initial_size = config.clickhouse.max_connections as usize;
336
337        let manager = ClickhouseConnectionManager::new(config.clickhouse.clone(), metrics.clone());
338
339        let pool = deadpool::managed::Pool::<ClickhouseConnectionManager>::builder(manager)
340            .max_size(initial_size)
341            .build()
342            .expect("Failed to build connection pool");
343
344        Self {
345            pool,
346            config,
347            metrics,
348            is_initialized: AtomicBool::new(false),
349        }
350    }
351
352    pub async fn initialize(&self) -> Result<(), ClickhouseError> {
353        if self.is_initialized.load(Ordering::SeqCst) {
354            return Ok(());
355        }
356
357        log::info!("Initializing Clickhouse connection pool");
358
359        let warmup_count = self.config.clickhouse.max_connections as usize;
360
361        let mut warmup_handles = Vec::with_capacity(warmup_count);
362
363        for i in 0..warmup_count {
364            let pool = self.pool.clone();
365
366            let handle = task::spawn(async move {
367                match pool.get().await {
368                    Ok(conn) => match conn.health_check().await {
369                        Ok(_) => {
370                            log::debug!("Warm-up connection {} initialized successfully", i);
371                            Ok(())
372                        }
373                        Err(e) => {
374                            log::error!("Warm-up connection {} health check failed: {}", i, e);
375                            Err(e)
376                        }
377                    },
378                    Err(e) => {
379                        log::error!("Failed to get warm-up connection {}: {}", i, e);
380                        Err(ClickhouseError::Pool(e.to_string()))
381                    }
382                }
383            });
384            warmup_handles.push(handle);
385        }
386
387        let mut warmup_success_count = 0;
388        for (i, handle) in warmup_handles.into_iter().enumerate() {
389            match handle.await {
390                Ok(Ok(_)) => {
391                    warmup_success_count += 1;
392                }
393                Ok(Err(e)) => {
394                    log::warn!("Warm-up connection {} failed: {}", i, e);
395                }
396                Err(e) => {
397                    log::error!("Warm-up task {} panicked: {}", i, e);
398                }
399            }
400        }
401
402        log::info!(
403            "Connection pool warm-up complete: {}/{} successful",
404            warmup_success_count,
405            warmup_count
406        );
407
408        self.is_initialized.store(true, Ordering::SeqCst);
409
410        if let Some(metrics) = &self.metrics {
411            let status = self.pool.status();
412            metrics.set_int_gauge_vec_mut(
413                "clickhouse_pool_connections",
414                &["available"],
415                status.available as i64,
416            );
417            metrics.set_int_gauge_vec_mut(
418                "clickhouse_pool_connections",
419                &["size"],
420                status.size as i64,
421            );
422        }
423
424        Ok(())
425    }
426
427    pub async fn get_connection(&self) -> Result<PooledConnection, ClickhouseError> {
428        if !self.is_initialized.load(Ordering::SeqCst) {
429            log::warn!("Attempting to get connection from uninitialized pool");
430        }
431
432        let start = Instant::now();
433
434        let timeout_duration = Duration::from_secs(self.config.clickhouse.connect_timeout_seconds);
435
436        for attempt in 0..3 {
437            match tokio::time::timeout(timeout_duration, self.pool.get()).await {
438                Ok(Ok(conn)) => {
439                    let duration = start.elapsed();
440
441                    if let Some(metrics) = &self.metrics {
442                        metrics.set_gauge_vec_mut(
443                            "clickhouse_connection_acquisition_seconds",
444                            &["success"],
445                            duration.as_secs_f64(),
446                        );
447                        metrics.inc_int_counter_vec_mut(
448                            "clickhouse_connection_acquisition_total",
449                            &["success"],
450                        );
451                    }
452
453                    log::debug!(
454                        "Connection acquired in {:?} (attempt {})",
455                        duration,
456                        attempt + 1
457                    );
458                    return Ok(conn);
459                }
460                Ok(Err(e)) => {
461                    if let Some(metrics) = &self.metrics {
462                        metrics.inc_int_counter_vec_mut(
463                            "clickhouse_connection_acquisition_total",
464                            &["failure"],
465                        );
466                    }
467
468                    log::warn!(
469                        "Failed to get connection from pool (attempt {}): {}",
470                        attempt + 1,
471                        e
472                    );
473
474                    if attempt >= 2 {
475                        return Err(ClickhouseError::Pool(e.to_string()));
476                    }
477                }
478                Err(_) => {
479                    if let Some(metrics) = &self.metrics {
480                        metrics.inc_int_counter_vec_mut(
481                            "clickhouse_connection_acquisition_total",
482                            &["timeout"],
483                        );
484                    }
485
486                    log::warn!(
487                        "Timed out waiting for connection (attempt {}) after {:?}",
488                        attempt + 1,
489                        timeout_duration
490                    );
491
492                    if attempt >= 2 {
493                        return Err(ClickhouseError::Timeout);
494                    }
495                }
496            }
497            let backoff = Duration::from_millis(50 * 2u64.pow(attempt));
498            tokio::time::sleep(backoff).await;
499        }
500        Err(ClickhouseError::Pool(
501            "Failed to get connection after retries".to_string(),
502        ))
503    }
504
505    pub async fn shutdown(&self) -> Result<(), ClickhouseError> {
506        log::info!("Initiating graceful shutdown of ClickHouse connection pool");
507
508        let pool_manager = self.pool.manager();
509        pool_manager.initiate_shutdown();
510
511        let status = self.pool.status();
512        log::info!(
513            "Connection pool status before shutdown: size={}, available={}, in_use={}",
514            status.size,
515            status.available,
516            status.size - status.available
517        );
518
519        let drain_timeout = Duration::from_secs(30);
520        let drain_start = Instant::now();
521
522        loop {
523            let status = self.pool.status();
524            let in_use = status.size - status.available;
525
526            if in_use == 0 {
527                log::info!("All connections returned to pool, proceeding with shutdown");
528                break;
529            }
530
531            if drain_start.elapsed() > drain_timeout {
532                log::warn!(
533                    "Shutdown drain timeout exceeded, {} connections still in use",
534                    in_use
535                );
536
537                break;
538            }
539
540            log::info!("Waiting for {} connections to be returned to pool", in_use);
541            tokio::time::sleep(Duration::from_secs(1)).await;
542        }
543
544        // Close all connections
545        self.pool.close();
546        log::info!("All connections closed");
547
548        log::info!("ClickHouse connection pool shutdown complete");
549
550        Ok(())
551    }
552
553    fn register_metrics(metrics: &SharedRegistrar) {
554        let metric_configs = [
555            // Connection
556            MetricConfig {
557                kind: Kind::IntCounterVec,
558                name: "clickhouse_connections_created_total",
559                help: "Total no. of connections created",
560                label_names: &["status"],
561            },
562            MetricConfig {
563                kind: Kind::IntGaugeVec,
564                name: "clickhouse_pool_connections",
565                help: "Current no. of connections in the pool",
566                label_names: &["state"],
567            },
568            MetricConfig {
569                kind: Kind::IntCounterVec,
570                name: "clickhouse_connetion_health_checks_total",
571                help: "Total no. of connection health checks",
572                label_names: &["status"],
573            },
574            MetricConfig {
575                kind: Kind::GaugeVec,
576                name: "clickhouse_connection_creation_seconds",
577                help: "Time taken to create connections",
578                label_names: &["operation"],
579            },
580            // Queries
581            MetricConfig {
582                kind: Kind::IntCounterVec,
583                name: "clickhouse_queries_total",
584                help: "Total no. of queries executed",
585                label_names: &["type"],
586            },
587            MetricConfig {
588                kind: Kind::IntCounterVec,
589                name: "clickhouse_query_errors_total",
590                help: "Total no. of query errors",
591                label_names: &["type"],
592            },
593            MetricConfig {
594                kind: Kind::GaugeVec,
595                name: "clickhouse_query_duration_seconds",
596                help: "Query execution time in seconds",
597                label_names: &["type"],
598            },
599            // Batch queries
600            MetricConfig {
601                kind: Kind::IntCounterVec,
602                name: "clickhouse_batch_query_errors_total",
603                help: "Total number of batch query errors",
604                label_names: &["type"],
605            },
606            MetricConfig {
607                kind: Kind::GaugeVec,
608                name: "clickhouse_batch_query_duration_seconds",
609                help: "Batch query execution time in seconds",
610                label_names: &["type"],
611            },
612            // Connection acquisition
613            MetricConfig {
614                kind: Kind::GaugeVec,
615                name: "clickhouse_connection_acquisition_seconds",
616                help: "Time taken to acquire a connection from the pool",
617                label_names: &["status"],
618            },
619            MetricConfig {
620                kind: Kind::IntCounterVec,
621                name: "clickhouse_connection_acquisition_total",
622                help: "Total number of connection acquisition attempts",
623                label_names: &["status"],
624            },
625            MetricConfig {
626                kind: Kind::IntCounterVec,
627                name: "clickhouse_connections_recycled_total",
628                help: "Total number of connections recycled",
629                label_names: &["reason"],
630            },
631            MetricConfig {
632                kind: Kind::GaugeVec,
633                name: "clickhouse_connection_recycling_seconds",
634                help: "Time taken for connection recycling",
635                label_names: &["operation"],
636            },
637        ];
638
639        metrics.with_metric_configs(&metric_configs).ok();
640    }
641
642    pub fn status(&self) -> PoolMetrics {
643        let status = self.pool.status();
644
645        PoolMetrics {
646            size: status.size,
647            available: status.available,
648            in_use: status.size - status.available,
649            max_size: status.max_size,
650            min_size: status.max_size,
651            waiters: status.waiting,
652        }
653    }
654}