1#![cfg_attr(feature = "docs", feature(doc_cfg))]
80#![warn(missing_docs)]
81#![recursion_limit = "256"]
82mod config;
83
84mod error;
85mod metrics_utils;
86#[cfg(feature = "unstable")]
87#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
88pub mod runtime;
89mod spawn;
90mod time;
91
92pub use error::Error;
93
94pub use async_trait::async_trait;
95pub use config::Builder;
96use config::{Config, InternalConfig, ShareConfig};
97use futures_channel::mpsc::{self, Receiver, Sender};
98use futures_util::lock::{Mutex, MutexGuard};
99use futures_util::select;
100use futures_util::FutureExt;
101use futures_util::SinkExt;
102use futures_util::StreamExt;
103use metrics::{decrement_gauge, gauge, histogram, increment_counter, increment_gauge};
104pub use spawn::spawn;
105use std::fmt;
106use std::future::Future;
107use std::ops::{Deref, DerefMut};
108use std::sync::{
109 atomic::{AtomicU64, Ordering},
110 Arc, Weak,
111};
112use std::time::{Duration, Instant};
113#[doc(hidden)]
114pub use time::{delay_for, interval};
115use tokio::sync::Semaphore;
116
117use metrics_utils::{ACTIVE_CONNECTIONS, WAIT_COUNT, WAIT_DURATION};
118
119use crate::metrics_utils::{CLOSED_TOTAL, IDLE_CONNECTIONS, OPENED_TOTAL, OPEN_CONNECTIONS};
120
121const CONNECTION_REQUEST_QUEUE_SIZE: usize = 10000;
122
123#[async_trait]
124pub trait Manager: Send + Sync + 'static {
126 type Connection: Send + 'static;
128 type Error: Send + Sync + 'static;
130
131 fn spawn_task<T>(&self, task: T)
133 where
134 T: Future + Send + 'static,
135 T::Output: Send + 'static,
136 {
137 spawn(task);
138 }
139
140 async fn connect(&self) -> Result<Self::Connection, Self::Error>;
142
143 async fn check(&self, conn: Self::Connection) -> Result<Self::Connection, Self::Error>;
148
149 #[inline]
151 fn validate(&self, _conn: &mut Self::Connection) -> bool {
152 true
153 }
154}
155
156struct SharedPool<M: Manager> {
157 config: ShareConfig,
158 manager: M,
159 internals: Mutex<PoolInternals<M::Connection, M::Error>>,
160 state: PoolState,
161 semaphore: Semaphore,
162}
163
164struct Conn<C, E> {
165 raw: Option<C>,
166 #[allow(dead_code)]
167 last_err: Mutex<Option<E>>,
168 created_at: Instant,
169 last_used_at: Instant,
170 last_checked_at: Instant,
171 brand_new: bool,
172}
173
174impl<C, E> Conn<C, E> {
175 fn close(&self, state: &PoolState) {
176 state.num_open.fetch_sub(1, Ordering::Relaxed);
177 state.max_idle_closed.fetch_add(1, Ordering::Relaxed);
178 decrement_gauge!(OPEN_CONNECTIONS, 1.0);
179 increment_counter!(CLOSED_TOTAL);
180 }
181
182 fn expired(&self, timeout: Option<Duration>) -> bool {
183 timeout
184 .and_then(|check_interval| {
185 Instant::now()
186 .checked_duration_since(self.created_at)
187 .map(|dur_since| dur_since >= check_interval)
188 })
189 .unwrap_or(false)
190 }
191
192 fn idle_expired(&self, timeout: Option<Duration>) -> bool {
193 timeout
194 .and_then(|check_interval| {
195 Instant::now()
196 .checked_duration_since(self.last_used_at)
197 .map(|dur_since| dur_since >= check_interval)
198 })
199 .unwrap_or(false)
200 }
201
202 fn needs_health_check(&self, timeout: Option<Duration>) -> bool {
203 timeout
204 .and_then(|check_interval| {
205 Instant::now()
206 .checked_duration_since(self.last_checked_at)
207 .map(|dur_since| dur_since >= check_interval)
208 })
209 .unwrap_or(true)
210 }
211}
212
213struct PoolInternals<C, E> {
214 config: InternalConfig,
215 free_conns: Vec<Conn<C, E>>,
216 wait_duration: Duration,
217 cleaner_ch: Option<Sender<()>>,
218}
219
220struct PoolState {
221 num_open: AtomicU64,
222 max_lifetime_closed: AtomicU64,
223 max_idle_closed: AtomicU64,
224 wait_count: AtomicU64,
225}
226
227impl<C, E> Drop for PoolInternals<C, E> {
228 fn drop(&mut self) {
229 log::debug!("Pool internal drop");
230 }
231}
232
233pub struct Pool<M: Manager>(Arc<SharedPool<M>>);
235
236impl<M: Manager> Clone for Pool<M> {
238 fn clone(&self) -> Self {
239 Pool(self.0.clone())
240 }
241}
242
243pub struct State {
245 pub max_open: u64,
247
248 pub connections: u64,
251 pub in_use: u64,
253 pub idle: u64,
255
256 pub wait_count: u64,
259 pub wait_duration: Duration,
261 pub max_idle_closed: u64,
263 pub max_lifetime_closed: u64,
265}
266
267impl fmt::Debug for State {
268 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
269 fmt.debug_struct("Stats")
270 .field("max_open", &self.max_open)
271 .field("connections", &self.connections)
272 .field("in_use", &self.in_use)
273 .field("idle", &self.idle)
274 .field("wait_count", &self.wait_count)
275 .field("wait_duration", &self.wait_duration)
276 .field("max_idle_closed", &self.max_idle_closed)
277 .field("max_lifetime_closed", &self.max_lifetime_closed)
278 .finish()
279 }
280}
281
282impl<M: Manager> Drop for Pool<M> {
283 fn drop(&mut self) {}
284}
285
286impl<M: Manager> Pool<M> {
287 pub fn new(manager: M) -> Pool<M> {
289 Pool::builder().build(manager)
290 }
291
292 pub fn builder() -> Builder<M> {
294 Builder::new()
295 }
296
297 pub async fn set_max_open_conns(&self, n: u64) {
301 let mut internals = self.0.internals.lock().await;
302 internals.config.max_open = n;
303 if n > 0 && internals.config.max_idle > n {
304 drop(internals);
305 self.set_max_idle_conns(n).await;
306 }
307 }
308
309 pub async fn set_max_idle_conns(&self, n: u64) {
316 let mut internals = self.0.internals.lock().await;
317 internals.config.max_idle =
318 if internals.config.max_open > 0 && n > internals.config.max_open {
319 internals.config.max_open
320 } else {
321 n
322 };
323
324 let max_idle = internals.config.max_idle as usize;
325 if internals.free_conns.len() > max_idle {
326 let closing = internals.free_conns.split_off(max_idle);
327 for conn in closing {
328 conn.close(&self.0.state);
329 }
330 }
331 }
332
333 pub async fn set_conn_max_lifetime(&self, max_lifetime: Option<Duration>) {
344 assert_ne!(
345 max_lifetime,
346 Some(Duration::from_secs(0)),
347 "max_lifetime must be positive"
348 );
349 let mut internals = self.0.internals.lock().await;
350 internals.config.max_lifetime = max_lifetime;
351 if let Some(lifetime) = max_lifetime {
352 match internals.config.max_lifetime {
353 Some(prev) if lifetime < prev && internals.cleaner_ch.is_some() => {
354 let _ = internals.cleaner_ch.as_mut().unwrap().send(()).await;
356 }
357 _ => (),
358 }
359 }
360
361 if max_lifetime.is_some()
362 && self.0.state.num_open.load(Ordering::Relaxed) > 0
363 && internals.cleaner_ch.is_none()
364 {
365 log::debug!("run connection cleaner");
366 let shared1 = Arc::downgrade(&self.0);
367 let clean_rate = self.0.config.clean_rate;
368 let (cleaner_ch_sender, cleaner_ch) = mpsc::channel(1);
369 internals.cleaner_ch = Some(cleaner_ch_sender);
370 self.0.manager.spawn_task(async move {
371 connection_cleaner(shared1, cleaner_ch, clean_rate).await;
372 });
373 }
374 }
375
376 pub(crate) fn new_inner(manager: M, config: Config) -> Self {
377 let max_open = if config.max_open == 0 {
378 CONNECTION_REQUEST_QUEUE_SIZE
379 } else {
380 config.max_open as usize
381 };
382
383 gauge!(IDLE_CONNECTIONS, max_open as f64);
384
385 let (share_config, internal_config) = config.split();
386 let internals = Mutex::new(PoolInternals {
387 config: internal_config,
388 free_conns: Vec::new(),
389 wait_duration: Duration::from_secs(0),
390 cleaner_ch: None,
391 });
392
393 let pool_state = PoolState {
394 num_open: AtomicU64::new(0),
395 max_lifetime_closed: AtomicU64::new(0),
396 wait_count: AtomicU64::new(0),
397 max_idle_closed: AtomicU64::new(0),
398 };
399
400 let shared = Arc::new(SharedPool {
401 config: share_config,
402 manager,
403 internals,
404 semaphore: Semaphore::new(max_open),
405 state: pool_state,
406 });
407
408 Pool(shared)
409 }
410
411 pub async fn get(&self) -> Result<Connection<M>, Error<M::Error>> {
415 match self.0.config.get_timeout {
416 Some(duration) => self.get_timeout(duration).await,
417 None => self.inner_get_with_retries().await,
418 }
419 }
420
421 pub async fn get_timeout(&self, duration: Duration) -> Result<Connection<M>, Error<M::Error>> {
426 time::timeout(duration, self.inner_get_with_retries()).await
427 }
428
429 async fn inner_get_with_retries(&self) -> Result<Connection<M>, Error<M::Error>> {
430 let mut try_times: u32 = 0;
431 let config = &self.0.config;
432 loop {
433 try_times += 1;
434 match self.get_connection().await {
435 Ok(conn) => return Ok(conn),
436 Err(Error::BadConn) => {
437 if try_times == config.max_bad_conn_retries {
438 return self.get_connection().await;
439 }
440 continue;
441 }
442 Err(err) => return Err(err),
443 }
444 }
445 }
446
447 async fn get_connection(&self) -> Result<Connection<M>, Error<M::Error>> {
448 let mut c = self.get_or_create_conn().await?;
449 c.last_used_at = Instant::now();
450
451 let conn = Connection {
452 pool: Some(self.clone()),
453 conn: Some(c),
454 };
455
456 increment_gauge!(ACTIVE_CONNECTIONS, 1.0);
457 decrement_gauge!(WAIT_COUNT, 1.0);
458
459 Ok(conn)
460 }
461
462 async fn validate_conn(
463 &self,
464 internal_config: InternalConfig,
465 conn: &mut Conn<M::Connection, M::Error>,
466 ) -> bool {
467 if conn.brand_new {
468 return true;
469 }
470
471 if conn.expired(internal_config.max_lifetime) {
472 return false;
473 }
474
475 if conn.idle_expired(internal_config.max_idle_lifetime) {
476 return false;
477 }
478
479 let needs_health_check = self.0.config.health_check
480 && conn.needs_health_check(self.0.config.health_check_interval);
481
482 if needs_health_check {
483 let raw = conn.raw.take().unwrap();
484 match self.0.manager.check(raw).await {
485 Ok(raw) => {
486 conn.last_checked_at = Instant::now();
487 conn.raw = Some(raw)
488 }
489 Err(_e) => return false,
490 }
491 }
492 true
493 }
494
495 async fn get_or_create_conn(&self) -> Result<Conn<M::Connection, M::Error>, Error<M::Error>> {
496 self.0.state.wait_count.fetch_add(1, Ordering::Relaxed);
497 increment_gauge!(WAIT_COUNT, 1.0);
498 let wait_start = Instant::now();
499
500 let permit = self
501 .0
502 .semaphore
503 .acquire()
504 .await
505 .map_err(|_| Error::PoolClosed)?;
506
507 self.0.state.wait_count.fetch_sub(1, Ordering::SeqCst);
508
509 let mut internals = self.0.internals.lock().await;
510
511 internals.wait_duration += wait_start.elapsed();
512 histogram!(WAIT_DURATION, wait_start.elapsed());
513
514 let conn = internals.free_conns.pop();
515 let internal_config = internals.config.clone();
516 drop(internals);
517
518 if conn.is_some() {
519 let mut conn = conn.unwrap();
520 if self.validate_conn(internal_config, &mut conn).await {
521 decrement_gauge!(IDLE_CONNECTIONS, 1.0);
522 permit.forget();
523 return Ok(conn);
524 } else {
525 conn.close(&self.0.state);
526 }
527 }
528
529 let create_r = self.open_new_connection().await;
530
531 if create_r.is_ok() {
532 decrement_gauge!(IDLE_CONNECTIONS, 1.0);
533 permit.forget();
534 }
535
536 create_r
537 }
538
539 async fn open_new_connection(&self) -> Result<Conn<M::Connection, M::Error>, Error<M::Error>> {
540 log::debug!("creating new connection from manager");
541 match self.0.manager.connect().await {
542 Ok(c) => {
543 self.0.state.num_open.fetch_add(1, Ordering::Relaxed);
544 increment_gauge!(OPENED_TOTAL, 1.0);
545 increment_counter!(OPEN_CONNECTIONS);
546
547 let conn = Conn {
548 raw: Some(c),
549 last_err: Mutex::new(None),
550 created_at: Instant::now(),
551 last_used_at: Instant::now(),
552 last_checked_at: Instant::now(),
553 brand_new: true,
554 };
555
556 Ok(conn)
557 }
558 Err(e) => Err(Error::Inner(e)),
559 }
560 }
561
562 pub async fn state(&self) -> State {
566 let internals = self.0.internals.lock().await;
567 let num_free_conns = internals.free_conns.len() as u64;
568 let wait_duration = internals.wait_duration;
569 let max_open = internals.config.max_open;
570 drop(internals);
571 State {
572 max_open,
573
574 connections: self.0.state.num_open.load(Ordering::Relaxed),
575 in_use: self.0.state.num_open.load(Ordering::Relaxed) - num_free_conns,
576 idle: num_free_conns,
577
578 wait_count: self.0.state.wait_count.load(Ordering::Relaxed),
579 wait_duration,
580 max_idle_closed: self.0.state.max_idle_closed.load(Ordering::Relaxed),
581 max_lifetime_closed: self.0.state.max_lifetime_closed.load(Ordering::Relaxed),
582 }
583 }
584}
585
586async fn recycle_conn<M: Manager>(
587 shared: &Arc<SharedPool<M>>,
588 mut conn: Conn<M::Connection, M::Error>,
589) {
590 if conn_still_valid(shared, &mut conn) {
591 conn.brand_new = false;
592 let internals = shared.internals.lock().await;
593 put_idle_conn(shared, internals, conn);
594 } else {
595 conn.close(&shared.state);
596 }
597
598 shared.semaphore.add_permits(1);
599}
600
601fn conn_still_valid<M: Manager>(
602 shared: &Arc<SharedPool<M>>,
603 conn: &mut Conn<M::Connection, M::Error>,
604) -> bool {
605 if conn.raw.is_none() {
606 return false;
607 }
608
609 if !shared.manager.validate(conn.raw.as_mut().unwrap()) {
610 log::debug!("bad conn when check in");
611 return false;
612 }
613
614 true
615}
616
617fn put_idle_conn<M: Manager>(
618 shared: &Arc<SharedPool<M>>,
619 mut internals: MutexGuard<'_, PoolInternals<M::Connection, M::Error>>,
620 conn: Conn<M::Connection, M::Error>,
621) {
622 if internals.config.max_idle > internals.free_conns.len() as u64 {
623 internals.free_conns.push(conn);
624 drop(internals);
625 } else {
626 conn.close(&shared.state);
627 }
628}
629
630async fn connection_cleaner<M: Manager>(
631 shared: Weak<SharedPool<M>>,
632 mut cleaner_ch: Receiver<()>,
633 clean_rate: Duration,
634) {
635 let mut interval = interval(clean_rate);
636 interval.tick().await;
637 loop {
638 select! {
639 _ = interval.tick().fuse() => (),
640 r = cleaner_ch.next().fuse() => match r{
641 Some(()) => (),
642 None=> return
643 },
644 }
645
646 if !clean_connection(&shared).await {
647 return;
648 }
649 }
650}
651
652async fn clean_connection<M: Manager>(shared: &Weak<SharedPool<M>>) -> bool {
653 let shared = match shared.upgrade() {
654 Some(shared) => shared,
655 None => {
656 log::debug!("Failed to clean connections");
657 return false;
658 }
659 };
660
661 log::debug!("Clean connections");
662
663 let mut internals = shared.internals.lock().await;
664 if shared.state.num_open.load(Ordering::Relaxed) == 0 || internals.config.max_lifetime.is_none()
665 {
666 internals.cleaner_ch.take();
667 return false;
668 }
669
670 let expired = Instant::now() - internals.config.max_lifetime.unwrap();
671 let mut closing = vec![];
672
673 let mut i = 0;
674 log::debug!(
675 "clean connections, idle conns {}",
676 internals.free_conns.len()
677 );
678
679 loop {
680 if i >= internals.free_conns.len() {
681 break;
682 }
683
684 if internals.free_conns[i].created_at < expired {
685 let c = internals.free_conns.swap_remove(i);
686 closing.push(c);
687 continue;
688 }
689 i += 1;
690 }
691 drop(internals);
692
693 shared
694 .state
695 .max_lifetime_closed
696 .fetch_add(closing.len() as u64, Ordering::Relaxed);
697 for conn in closing {
698 conn.close(&shared.state);
699 }
700 true
701}
702
703pub struct Connection<M: Manager> {
705 pool: Option<Pool<M>>,
706 conn: Option<Conn<M::Connection, M::Error>>,
707}
708
709impl<M: Manager> Connection<M> {
710 pub fn is_brand_new(&self) -> bool {
712 self.conn.as_ref().unwrap().brand_new
713 }
714
715 pub fn into_inner(mut self) -> M::Connection {
717 self.conn.as_mut().unwrap().raw.take().unwrap()
718 }
719}
720
721impl<M: Manager> Drop for Connection<M> {
722 fn drop(&mut self) {
723 let pool = self.pool.take().unwrap();
724 let conn = self.conn.take().unwrap();
725
726 decrement_gauge!(ACTIVE_CONNECTIONS, 1.0);
727 increment_gauge!(IDLE_CONNECTIONS, 1.0);
728 pool.clone().0.manager.spawn_task(async move {
730 recycle_conn(&pool.0, conn).await;
731 });
732 }
733}
734
735impl<M: Manager> Deref for Connection<M> {
736 type Target = M::Connection;
737 fn deref(&self) -> &Self::Target {
738 self.conn.as_ref().unwrap().raw.as_ref().unwrap()
739 }
740}
741
742impl<M: Manager> DerefMut for Connection<M> {
743 fn deref_mut(&mut self) -> &mut M::Connection {
744 self.conn.as_mut().unwrap().raw.as_mut().unwrap()
745 }
746}