clickhouse_connection_pool/
pool_manager.rs1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use crate::batch_processor::{BatchCommand, BatchSender};
6use crate::config::DatalakeConfig;
7use crate::metrics::SharedRegistrar;
8use crate::pool::{get_query_type, ClickhouseConnectionPool, ClickhouseError};
9use crate::traits::{Model, PartitionKey};
10use anyhow::Result;
11use serde::de::DeserializeOwned;
12use tokio::sync::mpsc;
13use tokio::time::interval;
14
15#[derive(Clone)]
16pub struct PoolManager {
17 pool: Arc<ClickhouseConnectionPool>,
18 config: Arc<DatalakeConfig>,
19 metrics: Option<SharedRegistrar>,
20 last_recycle_time: u64,
21}
22
23impl PoolManager {
24 pub async fn new(config: Arc<DatalakeConfig>, metrics: Option<SharedRegistrar>) -> Self {
25 let pool = Arc::new(ClickhouseConnectionPool::new(
26 config.clone(),
27 metrics.clone(),
28 ));
29
30 let _ = match pool.initialize().await {
31 Ok(_) => {
32 log::debug!("Pool warmed up and initialized successfully");
33 }
34 Err(e) => {
35 log::error!("Error warming up and initializing pool: {:?}", e);
36 }
37 };
38
39 Self {
40 pool,
41 config,
42 metrics,
43 last_recycle_time: 0,
44 }
45 }
46
47 pub fn get_pool(&self) -> Arc<ClickhouseConnectionPool> {
48 self.pool.clone()
49 }
50
51 pub fn seconds_since_last_recycle(&self) -> u64 {
52 let last = self.last_recycle_time;
53
54 if last == 0 {
55 return u64::MAX;
56 }
57
58 let now = std::time::SystemTime::now()
59 .duration_since(std::time::UNIX_EPOCH)
60 .unwrap_or_default()
61 .as_secs();
62
63 now.saturating_sub(last)
64 }
65
66 pub async fn refill_connection_pool(&self) -> Result<usize, ClickhouseError> {
67 let pool = self.get_pool();
68 let status = pool.status();
69
70 let current_total = status.size;
71 let target_total = self.config.clickhouse.max_connections as usize;
72 let deficit = target_total.saturating_sub(current_total);
73
74 if deficit == 0 {
75 log::info!("Deficit = 0");
76 return Ok(0);
77 }
78
79 let to_add = deficit;
80 log::info!("Attempting to add {} new connections to pool", to_add);
81
82 let mut added = 0;
83
84 for i in 0..to_add {
85 match pool.get_connection().await {
86 Ok(conn) => match conn.health_check().await {
87 Ok(_) => {
88 added += 1;
89 }
90 Err(e) => {
91 log::warn!("New connection failed health check: {}", e);
92 }
93 },
94 Err(e) => {
95 log::error!(
96 "Failed to create new connection {}/{}: {}",
97 i + 1,
98 to_add,
99 e
100 );
101 tokio::time::sleep(Duration::from_millis(100)).await;
102 }
103 }
104 }
105
106 log::info!("Added {}/{} new connections to pool", added, to_add);
107 Ok(added)
108 }
109
110 pub async fn recycle_idle_connections(
111 &mut self,
112 max_to_recycle: usize,
113 ) -> Result<usize, ClickhouseError> {
114 log::info!(
115 "Starting connection recycling - checking up to {} connections",
116 max_to_recycle
117 );
118 let start = std::time::Instant::now();
119
120 let now = std::time::SystemTime::now()
121 .duration_since(std::time::UNIX_EPOCH)
122 .unwrap_or_default()
123 .as_secs();
124 self.last_recycle_time = now;
125
126 let status = self.pool.status();
127 log::info!("Clickhouse pool metrics: {:?}", status);
128
129 let to_check = std::cmp::min(max_to_recycle, status.available);
130
131 if to_check == 0 {
132 log::info!("No connections available for recycling");
133 return Ok(0);
134 }
135
136 log::info!(
137 "Checking {} connections out of {} available",
138 to_check,
139 status.available
140 );
141
142 let mut recycled = 0;
143
144 for _ in 0..to_check {
145 match self.pool.get_connection().await {
146 Ok(conn) => match conn.health_check().await {
147 Ok(_) => {
148 log::debug!(
149 "Connection [id: {}] is healthy, returning to pool",
150 conn.id()
151 );
152 }
153 Err(e) => {
154 log::warn!(
155 "Connection [id: {}] failed health check: {}, will be recycled",
156 conn.id(),
157 e
158 );
159 recycled += 1;
160
161 if let Some(metrics) = &self.metrics {
162 metrics.inc_int_counter_vec_mut(
163 "clickhouse_connections_recycled_total",
164 &["health_check_failed"],
165 );
166 }
167 }
168 },
169 Err(e) => {
170 log::error!("Failed to get connection for health check: {}", e);
171 }
172 }
173 }
174
175 let duration = start.elapsed();
176 log::info!(
177 "Connection recycling complete: recycled={} in {:?}",
178 recycled,
179 duration
180 );
181
182 if let Some(metrics) = &self.metrics {
183 metrics.set_gauge_vec_mut(
184 "clickhouse_connection_recycling_seconds",
185 &["total"],
186 duration.as_secs_f64(),
187 );
188 }
189
190 Ok(recycled)
191 }
192
193 pub async fn execute_with_retry(&self, query: &str) -> Result<(), ClickhouseError> {
194 let mut attempt = 0;
195 let max_retries = self.config.retry.max_retries;
196
197 loop {
198 attempt += 1;
199
200 let conn = match self.pool.get_connection().await {
201 Ok(conn) => conn,
202 Err(e) => {
203 if attempt > max_retries {
204 log::error!("Failed to get connection after {} attempts: {}", attempt, e);
205 return Err(e);
206 }
207
208 let backoff = self.config.retry.backoff_duration(attempt);
209 log::warn!(
210 "Failed to get connection (attempt {}/{}), retrying in {:?}: {}",
211 attempt,
212 max_retries,
213 backoff,
214 e
215 );
216
217 tokio::time::sleep(backoff).await;
218 continue;
219 }
220 };
221
222 match conn.query(query).execute().await {
223 Ok(response) => {
224 if let Some(metrics) = &self.metrics {
225 metrics.inc_int_counter_vec_mut(
226 "clickhouse_query_success_total",
227 &[get_query_type(query)],
228 );
229 }
230
231 return Ok(response);
232 }
233 Err(e) => {
234 if attempt > max_retries {
235 log::error!("Query failed after {} attempts: {}", attempt, e);
236 return Err(ClickhouseError::Client(e));
237 }
238
239 let backoff = self.config.retry.backoff_duration(attempt);
240 log::warn!(
241 "Query failed (attempt {}/{}), retrying in {:?}: {}\nQuery: {}",
242 attempt,
243 max_retries,
244 backoff,
245 e,
246 query
247 );
248
249 if let Some(metrics) = &self.metrics {
250 metrics.inc_int_counter_vec_mut(
251 "clickhouse_query_retries_total",
252 &[get_query_type(query)],
253 );
254 }
255
256 tokio::time::sleep(backoff).await;
257 }
258 }
259 }
260 }
261
262 pub async fn execute_select_with_retry<T>(&self, query: &str) -> Result<Vec<T>, ClickhouseError>
263 where
264 T: clickhouse::Row + DeserializeOwned + Send + 'static,
265 {
266 let mut attempt = 0;
267 let max_retries = self.config.retry.max_retries;
268
269 loop {
270 attempt += 1;
271
272 let conn = match self.pool.get_connection().await {
273 Ok(conn) => conn,
274 Err(e) => {
275 if attempt > max_retries {
276 log::error!("Failed to get connection after {} attempts: {}", attempt, e);
277 return Err(e);
278 }
279
280 let backoff = self.config.retry.backoff_duration(attempt);
281 log::warn!(
282 "Failed to get connection (attempt {}/{}), retrying in {:?}: {}",
283 attempt,
284 max_retries,
285 backoff,
286 e
287 );
288
289 tokio::time::sleep(backoff).await;
290 continue;
291 }
292 };
293
294 match conn.query(query).fetch_all::<T>().await {
295 Ok(response) => {
296 if let Some(metrics) = &self.metrics {
297 metrics.inc_int_counter_vec_mut(
298 "clickhouse_query_success_total",
299 &[get_query_type(query)],
300 );
301 }
302
303 return Ok(response);
304 }
305 Err(e) => {
306 if attempt > max_retries {
307 log::error!("Query failed after {} attempts: {}", attempt, e);
308 return Err(ClickhouseError::Client(e));
309 }
310
311 let backoff = self.config.retry.backoff_duration(attempt);
312 log::warn!(
313 "Query failed (attempt {}/{}), retrying in {:?}: {}\nQuery: {}",
314 attempt,
315 max_retries,
316 backoff,
317 e,
318 query
319 );
320
321 if let Some(metrics) = &self.metrics {
322 metrics.inc_int_counter_vec_mut(
323 "clickhouse_query_retries_total",
324 &[get_query_type(query)],
325 );
326 }
327
328 tokio::time::sleep(backoff).await;
329 }
330 }
331 }
332 }
333
334 pub fn create_batch_processor<M>(
335 &self,
336 batch_size: usize,
337 max_wait_ms: u64,
338 ) -> BatchSender<M::T>
339 where
340 M: Model + Send + Sync + 'static,
341 M::T: Clone + Send + 'static,
342 {
343 let (tx, mut rx) = mpsc::channel(1000);
344
345 let pool_manager = self.clone();
346
347 tokio::spawn(async move {
348 let mut batch = Vec::with_capacity(batch_size);
349 let mut last_flush = Instant::now();
350 let mut flush_interval = interval(Duration::from_millis(100));
351
352 loop {
353 tokio::select! {
354 cmd = rx.recv() => match cmd {
355 Some(BatchCommand::Add(item)) => {
356 batch.push(item);
357
358 if batch.len() >= batch_size {
359 Self::process_batch::<M>(&pool_manager, &mut batch).await;
360 last_flush = Instant::now();
361 }
362 },
363 Some(BatchCommand::Flush) => {
364 if !batch.is_empty() {
365 Self::process_batch::<M>(&pool_manager, &mut batch).await;
366 last_flush = Instant::now();
367 }
368 },
369 None => break,
370 },
371
372 _ = flush_interval.tick() => {
373 if !batch.is_empty() && last_flush.elapsed() >= Duration::from_millis(max_wait_ms) {
374 Self::process_batch::<M>(&pool_manager, &mut batch).await;
375 last_flush = Instant::now();
376 }
377 }
378 }
379 }
380
381 if !batch.is_empty() {
382 Self::process_batch::<M>(&pool_manager, &mut batch).await;
383 }
384 });
385
386 BatchSender { tx }
387 }
388
389 async fn process_batch<M>(pool_manager: &PoolManager, batch: &mut Vec<M::T>)
390 where
391 M: Model,
392 M::T: Clone,
393 {
394 if batch.is_empty() {
395 return;
396 }
397
398 let items = std::mem::take(batch);
399
400 let query = M::batch_insert_query(&items);
401
402 match pool_manager.execute_with_retry(&query).await {
403 Ok(_) => {
404 log::info!(
405 "Successfully inserted {} items into {}",
406 items.len(),
407 M::table_name()
408 );
409 }
410 Err(e) => {
411 log::error!("Error inserting batch into {}: {}", M::table_name(), e);
412 batch.extend(items);
413 }
414 }
415 }
416
417 pub fn create_partition_aware_batch_processor<M>(
418 &self,
419 batch_size: usize,
420 max_wait_ms: u64,
421 max_partitions_per_batch: usize,
422 ) -> BatchSender<M::T>
423 where
424 M: Model + Send + Sync + 'static,
425 M::T: Clone + Send + PartitionKey + 'static,
426 {
427 let (tx, mut rx) = mpsc::channel::<BatchCommand<M::T>>(1000);
428 let pool_manager = self.clone();
429
430 tokio::spawn(async move {
431 let mut partition_batches: HashMap<String, Vec<M::T>> = HashMap::new();
432 let mut last_flush = Instant::now();
433 let mut flush_interval = interval(Duration::from_millis(100));
434
435 loop {
436 tokio::select! {
437 cmd = rx.recv() => match cmd {
438 Some(BatchCommand::Add(item)) => {
439 let partition_key = item.partition_key();
440 let partition_batch = partition_batches.entry(partition_key).or_insert_with(Vec::new);
441 partition_batch.push(item);
442
443 let should_flush_size = partition_batches.values().any(|batch| batch.len() >= batch_size);
444
445 let should_flush_partitions = partition_batches.len() >= max_partitions_per_batch;
446
447 if should_flush_size || should_flush_partitions {
448 Self::process_partition_batches::<M>(&pool_manager, &mut partition_batches).await;
449 last_flush = Instant::now();
450 }
451 },
452 Some(BatchCommand::Flush) => {
453 if !partition_batches.is_empty() {
454 Self::process_partition_batches::<M>(&pool_manager, &mut partition_batches).await;
455 last_flush = Instant::now();
456 }
457 },
458 None => break,
459 },
460
461 _ = flush_interval.tick() => {
462 if !partition_batches.is_empty() && last_flush.elapsed() >= Duration::from_millis(max_wait_ms) {
463 Self::process_partition_batches::<M>(&pool_manager, &mut partition_batches).await;
464 last_flush = Instant::now();
465 }
466 }
467 }
468 }
469
470 if !partition_batches.is_empty() {
471 Self::process_partition_batches::<M>(&pool_manager, &mut partition_batches).await;
472 }
473 });
474
475 BatchSender { tx }
476 }
477
478 async fn process_partition_batches<M>(
479 pool_manager: &PoolManager,
480 partition_batches: &mut HashMap<String, Vec<M::T>>
481 )
482 where
483 M: Model,
484 M::T: Clone,
485 {
486 if partition_batches.is_empty() {
487 return;
488 }
489
490 for (partition_key, batch) in partition_batches.drain() {
491 if batch.is_empty() {
492 continue;
493 }
494
495 let query = M::batch_insert_query(&batch);
496
497 match pool_manager.execute_with_retry(&query).await {
498 Ok(_) => {
499 log::info!(
500 "Successfully inserted {} items into {} for partition {}",
501 batch.len(),
502 M::table_name(),
503 partition_key
504 );
505 }
506 Err(e) => {
507 log::error!(
508 "Error inserting batch into {} for partition {}: {}",
509 M::table_name(),
510 partition_key,
511 e
512 );
513 }
514 }
515 }
516 }
517}