Skip to main content

clickhouse_connection_pool/
pool_manager.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use crate::batch_processor::{BatchCommand, BatchSender};
6use crate::config::DatalakeConfig;
7use crate::metrics::SharedRegistrar;
8use crate::pool::{get_query_type, ClickhouseConnectionPool, ClickhouseError};
9use crate::traits::{Model, PartitionKey};
10use anyhow::Result;
11use serde::de::DeserializeOwned;
12use tokio::sync::mpsc;
13use tokio::time::interval;
14
15#[derive(Clone)]
16pub struct PoolManager {
17    pool: Arc<ClickhouseConnectionPool>,
18    config: Arc<DatalakeConfig>,
19    metrics: Option<SharedRegistrar>,
20    last_recycle_time: u64,
21}
22
23impl PoolManager {
24    pub async fn new(config: Arc<DatalakeConfig>, metrics: Option<SharedRegistrar>) -> Self {
25        let pool = Arc::new(ClickhouseConnectionPool::new(
26            config.clone(),
27            metrics.clone(),
28        ));
29
30        let _ = match pool.initialize().await {
31            Ok(_) => {
32                log::debug!("Pool warmed up and initialized successfully");
33            }
34            Err(e) => {
35                log::error!("Error warming up and initializing pool: {:?}", e);
36            }
37        };
38
39        Self {
40            pool,
41            config,
42            metrics,
43            last_recycle_time: 0,
44        }
45    }
46
47    pub fn get_pool(&self) -> Arc<ClickhouseConnectionPool> {
48        self.pool.clone()
49    }
50
51    pub fn seconds_since_last_recycle(&self) -> u64 {
52        let last = self.last_recycle_time;
53
54        if last == 0 {
55            return u64::MAX;
56        }
57
58        let now = std::time::SystemTime::now()
59            .duration_since(std::time::UNIX_EPOCH)
60            .unwrap_or_default()
61            .as_secs();
62
63        now.saturating_sub(last)
64    }
65
66    pub async fn refill_connection_pool(&self) -> Result<usize, ClickhouseError> {
67        let pool = self.get_pool();
68        let status = pool.status();
69
70        let current_total = status.size;
71        let target_total = self.config.clickhouse.max_connections as usize;
72        let deficit = target_total.saturating_sub(current_total);
73
74        if deficit == 0 {
75            log::info!("Deficit = 0");
76            return Ok(0);
77        }
78
79        let to_add = deficit;
80        log::info!("Attempting to add {} new connections to pool", to_add);
81
82        let mut added = 0;
83
84        for i in 0..to_add {
85            match pool.get_connection().await {
86                Ok(conn) => match conn.health_check().await {
87                    Ok(_) => {
88                        added += 1;
89                    }
90                    Err(e) => {
91                        log::warn!("New connection failed health check: {}", e);
92                    }
93                },
94                Err(e) => {
95                    log::error!(
96                        "Failed to create new connection {}/{}: {}",
97                        i + 1,
98                        to_add,
99                        e
100                    );
101                    tokio::time::sleep(Duration::from_millis(100)).await;
102                }
103            }
104        }
105
106        log::info!("Added {}/{} new connections to pool", added, to_add);
107        Ok(added)
108    }
109
110    pub async fn recycle_idle_connections(
111        &mut self,
112        max_to_recycle: usize,
113    ) -> Result<usize, ClickhouseError> {
114        log::info!(
115            "Starting connection recycling - checking up to {} connections",
116            max_to_recycle
117        );
118        let start = std::time::Instant::now();
119
120        let now = std::time::SystemTime::now()
121            .duration_since(std::time::UNIX_EPOCH)
122            .unwrap_or_default()
123            .as_secs();
124        self.last_recycle_time = now;
125
126        let status = self.pool.status();
127        log::info!("Clickhouse pool metrics: {:?}", status);
128
129        let to_check = std::cmp::min(max_to_recycle, status.available);
130
131        if to_check == 0 {
132            log::info!("No connections available for recycling");
133            return Ok(0);
134        }
135
136        log::info!(
137            "Checking {} connections out of {} available",
138            to_check,
139            status.available
140        );
141
142        let mut recycled = 0;
143
144        for _ in 0..to_check {
145            match self.pool.get_connection().await {
146                Ok(conn) => match conn.health_check().await {
147                    Ok(_) => {
148                        log::debug!(
149                            "Connection [id: {}] is healthy, returning to pool",
150                            conn.id()
151                        );
152                    }
153                    Err(e) => {
154                        log::warn!(
155                            "Connection [id: {}] failed health check: {}, will be recycled",
156                            conn.id(),
157                            e
158                        );
159                        recycled += 1;
160
161                        if let Some(metrics) = &self.metrics {
162                            metrics.inc_int_counter_vec_mut(
163                                "clickhouse_connections_recycled_total",
164                                &["health_check_failed"],
165                            );
166                        }
167                    }
168                },
169                Err(e) => {
170                    log::error!("Failed to get connection for health check: {}", e);
171                }
172            }
173        }
174
175        let duration = start.elapsed();
176        log::info!(
177            "Connection recycling complete: recycled={} in {:?}",
178            recycled,
179            duration
180        );
181
182        if let Some(metrics) = &self.metrics {
183            metrics.set_gauge_vec_mut(
184                "clickhouse_connection_recycling_seconds",
185                &["total"],
186                duration.as_secs_f64(),
187            );
188        }
189
190        Ok(recycled)
191    }
192
193    pub async fn execute_with_retry(&self, query: &str) -> Result<(), ClickhouseError> {
194        let mut attempt = 0;
195        let max_retries = self.config.retry.max_retries;
196
197        loop {
198            attempt += 1;
199
200            let conn = match self.pool.get_connection().await {
201                Ok(conn) => conn,
202                Err(e) => {
203                    if attempt > max_retries {
204                        log::error!("Failed to get connection after {} attempts: {}", attempt, e);
205                        return Err(e);
206                    }
207
208                    let backoff = self.config.retry.backoff_duration(attempt);
209                    log::warn!(
210                        "Failed to get connection (attempt {}/{}), retrying in {:?}: {}",
211                        attempt,
212                        max_retries,
213                        backoff,
214                        e
215                    );
216
217                    tokio::time::sleep(backoff).await;
218                    continue;
219                }
220            };
221
222            match conn.query(query).execute().await {
223                Ok(response) => {
224                    if let Some(metrics) = &self.metrics {
225                        metrics.inc_int_counter_vec_mut(
226                            "clickhouse_query_success_total",
227                            &[get_query_type(query)],
228                        );
229                    }
230
231                    return Ok(response);
232                }
233                Err(e) => {
234                    if attempt > max_retries {
235                        log::error!("Query failed after {} attempts: {}", attempt, e);
236                        return Err(ClickhouseError::Client(e));
237                    }
238
239                    let backoff = self.config.retry.backoff_duration(attempt);
240                    log::warn!(
241                        "Query failed (attempt {}/{}), retrying in {:?}: {}\nQuery: {}",
242                        attempt,
243                        max_retries,
244                        backoff,
245                        e,
246                        query
247                    );
248
249                    if let Some(metrics) = &self.metrics {
250                        metrics.inc_int_counter_vec_mut(
251                            "clickhouse_query_retries_total",
252                            &[get_query_type(query)],
253                        );
254                    }
255
256                    tokio::time::sleep(backoff).await;
257                }
258            }
259        }
260    }
261
262    pub async fn execute_select_with_retry<T>(&self, query: &str) -> Result<Vec<T>, ClickhouseError>
263    where
264        T: clickhouse::Row + DeserializeOwned + Send + 'static,
265    {
266        let mut attempt = 0;
267        let max_retries = self.config.retry.max_retries;
268
269        loop {
270            attempt += 1;
271
272            let conn = match self.pool.get_connection().await {
273                Ok(conn) => conn,
274                Err(e) => {
275                    if attempt > max_retries {
276                        log::error!("Failed to get connection after {} attempts: {}", attempt, e);
277                        return Err(e);
278                    }
279
280                    let backoff = self.config.retry.backoff_duration(attempt);
281                    log::warn!(
282                        "Failed to get connection (attempt {}/{}), retrying in {:?}: {}",
283                        attempt,
284                        max_retries,
285                        backoff,
286                        e
287                    );
288
289                    tokio::time::sleep(backoff).await;
290                    continue;
291                }
292            };
293
294            match conn.query(query).fetch_all::<T>().await {
295                Ok(response) => {
296                    if let Some(metrics) = &self.metrics {
297                        metrics.inc_int_counter_vec_mut(
298                            "clickhouse_query_success_total",
299                            &[get_query_type(query)],
300                        );
301                    }
302
303                    return Ok(response);
304                }
305                Err(e) => {
306                    if attempt > max_retries {
307                        log::error!("Query failed after {} attempts: {}", attempt, e);
308                        return Err(ClickhouseError::Client(e));
309                    }
310
311                    let backoff = self.config.retry.backoff_duration(attempt);
312                    log::warn!(
313                        "Query failed (attempt {}/{}), retrying in {:?}: {}\nQuery: {}",
314                        attempt,
315                        max_retries,
316                        backoff,
317                        e,
318                        query
319                    );
320
321                    if let Some(metrics) = &self.metrics {
322                        metrics.inc_int_counter_vec_mut(
323                            "clickhouse_query_retries_total",
324                            &[get_query_type(query)],
325                        );
326                    }
327
328                    tokio::time::sleep(backoff).await;
329                }
330            }
331        }
332    }
333
334    pub fn create_batch_processor<M>(
335        &self,
336        batch_size: usize,
337        max_wait_ms: u64,
338    ) -> BatchSender<M::T>
339    where
340        M: Model + Send + Sync + 'static,
341        M::T: Clone + Send + 'static,
342    {
343        let (tx, mut rx) = mpsc::channel(1000);
344
345        let pool_manager = self.clone();
346
347        tokio::spawn(async move {
348            let mut batch = Vec::with_capacity(batch_size);
349            let mut last_flush = Instant::now();
350            let mut flush_interval = interval(Duration::from_millis(100));
351
352            loop {
353                tokio::select! {
354                    cmd = rx.recv() => match cmd {
355                        Some(BatchCommand::Add(item)) => {
356                            batch.push(item);
357
358                            if batch.len() >= batch_size {
359                                Self::process_batch::<M>(&pool_manager, &mut batch).await;
360                                last_flush = Instant::now();
361                            }
362                        },
363                        Some(BatchCommand::Flush) => {
364                            if !batch.is_empty() {
365                                Self::process_batch::<M>(&pool_manager, &mut batch).await;
366                                last_flush = Instant::now();
367                            }
368                        },
369                        None => break,
370                    },
371
372                    _ = flush_interval.tick() => {
373                        if !batch.is_empty() && last_flush.elapsed() >= Duration::from_millis(max_wait_ms) {
374                            Self::process_batch::<M>(&pool_manager, &mut batch).await;
375                            last_flush = Instant::now();
376                        }
377                    }
378                }
379            }
380
381            if !batch.is_empty() {
382                Self::process_batch::<M>(&pool_manager, &mut batch).await;
383            }
384        });
385
386        BatchSender { tx }
387    }
388
389    async fn process_batch<M>(pool_manager: &PoolManager, batch: &mut Vec<M::T>)
390    where
391        M: Model,
392        M::T: Clone,
393    {
394        if batch.is_empty() {
395            return;
396        }
397
398        let items = std::mem::take(batch);
399
400        let query = M::batch_insert_query(&items);
401
402        match pool_manager.execute_with_retry(&query).await {
403            Ok(_) => {
404                log::info!(
405                    "Successfully inserted {} items into {}",
406                    items.len(),
407                    M::table_name()
408                );
409            }
410            Err(e) => {
411                log::error!("Error inserting batch into {}: {}", M::table_name(), e);
412                batch.extend(items);
413            }
414        }
415    }
416
417    pub fn create_partition_aware_batch_processor<M>(
418        &self,
419        batch_size: usize,
420        max_wait_ms: u64,
421        max_partitions_per_batch: usize,
422    ) -> BatchSender<M::T>
423    where
424        M: Model + Send + Sync + 'static,
425        M::T: Clone + Send + PartitionKey + 'static,
426    {
427        let (tx, mut rx) = mpsc::channel::<BatchCommand<M::T>>(1000);
428        let pool_manager = self.clone();
429
430        tokio::spawn(async move {
431            let mut partition_batches: HashMap<String, Vec<M::T>> = HashMap::new();
432            let mut last_flush = Instant::now();
433            let mut flush_interval = interval(Duration::from_millis(100));
434
435            loop {
436                tokio::select! {
437                    cmd = rx.recv() => match cmd {
438                        Some(BatchCommand::Add(item)) => {
439                            let partition_key = item.partition_key();
440                            let partition_batch = partition_batches.entry(partition_key).or_insert_with(Vec::new);
441                            partition_batch.push(item);
442
443                            let should_flush_size = partition_batches.values().any(|batch| batch.len() >= batch_size);
444                            
445                            let should_flush_partitions = partition_batches.len() >= max_partitions_per_batch;
446
447                            if should_flush_size || should_flush_partitions {
448                                Self::process_partition_batches::<M>(&pool_manager, &mut partition_batches).await;
449                                last_flush = Instant::now();
450                            }
451                        },
452                        Some(BatchCommand::Flush) => {
453                            if !partition_batches.is_empty() {
454                                Self::process_partition_batches::<M>(&pool_manager, &mut partition_batches).await;
455                                last_flush = Instant::now();
456                            }
457                        },
458                        None => break,
459                    },
460
461                    _ = flush_interval.tick() => {
462                        if !partition_batches.is_empty() && last_flush.elapsed() >= Duration::from_millis(max_wait_ms) {
463                            Self::process_partition_batches::<M>(&pool_manager, &mut partition_batches).await;
464                            last_flush = Instant::now();
465                        }
466                    }
467                }
468            }
469
470            if !partition_batches.is_empty() {
471                Self::process_partition_batches::<M>(&pool_manager, &mut partition_batches).await;
472            }
473        });
474
475        BatchSender { tx }
476    }
477
478    async fn process_partition_batches<M>(
479        pool_manager: &PoolManager, 
480        partition_batches: &mut HashMap<String, Vec<M::T>>
481    )
482    where
483        M: Model,
484        M::T: Clone,
485    {
486        if partition_batches.is_empty() {
487            return;
488        }
489
490        for (partition_key, batch) in partition_batches.drain() {
491            if batch.is_empty() {
492                continue;
493            }
494
495            let query = M::batch_insert_query(&batch);
496
497            match pool_manager.execute_with_retry(&query).await {
498                Ok(_) => {
499                    log::info!(
500                        "Successfully inserted {} items into {} for partition {}",
501                        batch.len(),
502                        M::table_name(),
503                        partition_key
504                    );
505                }
506                Err(e) => {
507                    log::error!(
508                        "Error inserting batch into {} for partition {}: {}", 
509                        M::table_name(), 
510                        partition_key, 
511                        e
512                    );
513                }
514            }
515        }
516    }
517}