mobc_forked/
lib.rs

1//! A generic connection pool with async/await support.
2//!
3//! Opening a new database connection every time one is needed is both
4//! inefficient and can lead to resource exhaustion under high traffic
5//! conditions. A connection pool maintains a set of open connections to a
6//! database, handing them out for repeated use.
7//!
8//! mobc is agnostic to the connection type it is managing. Implementors of the
9//! `Manager` trait provide the database-specific logic to create and
10//! check the health of connections.
11//!
12//! # Example
13//!
14//! Using an imaginary "foodb" database.
15//!
16//! ```rust
17//!use mobc::{Manager, Pool, async_trait};
18//!
19//!#[derive(Debug)]
20//!struct FooError;
21//!
22//!struct FooConnection;
23//!
24//!impl FooConnection {
25//!    async fn query(&self) -> String {
26//!        "nori".to_string()
27//!    }
28//!}
29//!
30//!struct FooManager;
31//!
32//!#[async_trait]
33//!impl Manager for FooManager {
34//!    type Connection = FooConnection;
35//!    type Error = FooError;
36//!
37//!    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
38//!        Ok(FooConnection)
39//!    }
40//!
41//!    async fn check(&self, conn: Self::Connection) -> Result<Self::Connection, Self::Error> {
42//!        Ok(conn)
43//!    }
44//!}
45//!
46//!#[tokio::main]
47//!async fn main() {
48//!    let pool = Pool::builder().max_open(15).build(FooManager);
49//!    let num: usize = 10000;
50//!    let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(16);
51//!
52//!    for _ in 0..num {
53//!        let pool = pool.clone();
54//!        let mut tx = tx.clone();
55//!        tokio::spawn(async move {
56//!            let conn = pool.get().await.unwrap();
57//!            let name = conn.query().await;
58//!            assert_eq!(name, "nori".to_string());
59//!            tx.send(()).await.unwrap();
60//!        });
61//!    }
62//!
63//!    for _ in 0..num {
64//!        rx.recv().await.unwrap();
65//!    }
66//!}
67//! ```
68//!
69//! # Metrics
70//!
71//! Mobc uses the metrics crate to expose the following metrics
72//!
73//! 1. Active Connections - The number of connections in use.
74//! 1. Idle Connections - The number of connections that are not being used
75//! 1. Wait Count - the number of processes waiting for a connection
76//! 1. Wait Duration - A cumulative histogram of the wait time for a connection
77//!
78
79#![cfg_attr(feature = "docs", feature(doc_cfg))]
80#![warn(missing_docs)]
81#![recursion_limit = "256"]
82mod config;
83
84mod error;
85mod metrics_utils;
86#[cfg(feature = "unstable")]
87#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
88pub mod runtime;
89mod spawn;
90mod time;
91
92pub use error::Error;
93
94pub use async_trait::async_trait;
95pub use config::Builder;
96use config::{Config, InternalConfig, ShareConfig};
97use futures_channel::mpsc::{self, Receiver, Sender};
98use futures_util::lock::{Mutex, MutexGuard};
99use futures_util::select;
100use futures_util::FutureExt;
101use futures_util::SinkExt;
102use futures_util::StreamExt;
103use metrics::{decrement_gauge, gauge, histogram, increment_counter, increment_gauge};
104pub use spawn::spawn;
105use std::fmt;
106use std::future::Future;
107use std::ops::{Deref, DerefMut};
108use std::sync::{
109    atomic::{AtomicU64, Ordering},
110    Arc, Weak,
111};
112use std::time::{Duration, Instant};
113#[doc(hidden)]
114pub use time::{delay_for, interval};
115use tokio::sync::Semaphore;
116
117use metrics_utils::{ACTIVE_CONNECTIONS, WAIT_COUNT, WAIT_DURATION};
118
119use crate::metrics_utils::{CLOSED_TOTAL, IDLE_CONNECTIONS, OPENED_TOTAL, OPEN_CONNECTIONS};
120
121const CONNECTION_REQUEST_QUEUE_SIZE: usize = 10000;
122
123#[async_trait]
124/// A trait which provides connection-specific functionality.
125pub trait Manager: Send + Sync + 'static {
126    /// The connection type this manager deals with.
127    type Connection: Send + 'static;
128    /// The error type returned by `Connection`s.
129    type Error: Send + Sync + 'static;
130
131    /// Spawns a new asynchronous task.
132    fn spawn_task<T>(&self, task: T)
133    where
134        T: Future + Send + 'static,
135        T::Output: Send + 'static,
136    {
137        spawn(task);
138    }
139
140    /// Attempts to create a new connection.
141    async fn connect(&self) -> Result<Self::Connection, Self::Error>;
142
143    /// Determines if the connection is still connected to the database when check-out.
144    ///
145    /// A standard implementation would check if a simple query like `SELECT 1`
146    /// succeeds.
147    async fn check(&self, conn: Self::Connection) -> Result<Self::Connection, Self::Error>;
148
149    /// *Quickly* determines a connection is still valid when check-in.
150    #[inline]
151    fn validate(&self, _conn: &mut Self::Connection) -> bool {
152        true
153    }
154}
155
156struct SharedPool<M: Manager> {
157    config: ShareConfig,
158    manager: M,
159    internals: Mutex<PoolInternals<M::Connection, M::Error>>,
160    state: PoolState,
161    semaphore: Semaphore,
162}
163
164struct Conn<C, E> {
165    raw: Option<C>,
166    #[allow(dead_code)]
167    last_err: Mutex<Option<E>>,
168    created_at: Instant,
169    last_used_at: Instant,
170    last_checked_at: Instant,
171    brand_new: bool,
172}
173
174impl<C, E> Conn<C, E> {
175    fn close(&self, state: &PoolState) {
176        state.num_open.fetch_sub(1, Ordering::Relaxed);
177        state.max_idle_closed.fetch_add(1, Ordering::Relaxed);
178        decrement_gauge!(OPEN_CONNECTIONS, 1.0);
179        increment_counter!(CLOSED_TOTAL);
180    }
181
182    fn expired(&self, timeout: Option<Duration>) -> bool {
183        timeout
184            .and_then(|check_interval| {
185                Instant::now()
186                    .checked_duration_since(self.created_at)
187                    .map(|dur_since| dur_since >= check_interval)
188            })
189            .unwrap_or(false)
190    }
191
192    fn idle_expired(&self, timeout: Option<Duration>) -> bool {
193        timeout
194            .and_then(|check_interval| {
195                Instant::now()
196                    .checked_duration_since(self.last_used_at)
197                    .map(|dur_since| dur_since >= check_interval)
198            })
199            .unwrap_or(false)
200    }
201
202    fn needs_health_check(&self, timeout: Option<Duration>) -> bool {
203        timeout
204            .and_then(|check_interval| {
205                Instant::now()
206                    .checked_duration_since(self.last_checked_at)
207                    .map(|dur_since| dur_since >= check_interval)
208            })
209            .unwrap_or(true)
210    }
211}
212
213struct PoolInternals<C, E> {
214    config: InternalConfig,
215    free_conns: Vec<Conn<C, E>>,
216    wait_duration: Duration,
217    cleaner_ch: Option<Sender<()>>,
218}
219
220struct PoolState {
221    num_open: AtomicU64,
222    max_lifetime_closed: AtomicU64,
223    max_idle_closed: AtomicU64,
224    wait_count: AtomicU64,
225}
226
227impl<C, E> Drop for PoolInternals<C, E> {
228    fn drop(&mut self) {
229        log::debug!("Pool internal drop");
230    }
231}
232
233/// A generic connection pool.
234pub struct Pool<M: Manager>(Arc<SharedPool<M>>);
235
236/// Returns a new `Pool` referencing the same state as `self`.
237impl<M: Manager> Clone for Pool<M> {
238    fn clone(&self) -> Self {
239        Pool(self.0.clone())
240    }
241}
242
243/// Information about the state of a `Pool`.
244pub struct State {
245    /// Maximum number of open connections to the database
246    pub max_open: u64,
247
248    // Pool Status
249    /// The number of established connections both in use and idle.
250    pub connections: u64,
251    /// The number of connections currently in use.
252    pub in_use: u64,
253    /// The number of idle connections.
254    pub idle: u64,
255
256    // Counters
257    /// The total number of connections waited for.
258    pub wait_count: u64,
259    /// The total time blocked waiting for a new connection.
260    pub wait_duration: Duration,
261    /// The total number of connections closed due to `max_idle`.
262    pub max_idle_closed: u64,
263    /// The total number of connections closed due to `max_lifetime`.
264    pub max_lifetime_closed: u64,
265}
266
267impl fmt::Debug for State {
268    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
269        fmt.debug_struct("Stats")
270            .field("max_open", &self.max_open)
271            .field("connections", &self.connections)
272            .field("in_use", &self.in_use)
273            .field("idle", &self.idle)
274            .field("wait_count", &self.wait_count)
275            .field("wait_duration", &self.wait_duration)
276            .field("max_idle_closed", &self.max_idle_closed)
277            .field("max_lifetime_closed", &self.max_lifetime_closed)
278            .finish()
279    }
280}
281
282impl<M: Manager> Drop for Pool<M> {
283    fn drop(&mut self) {}
284}
285
286impl<M: Manager> Pool<M> {
287    /// Creates a new connection pool with a default configuration.
288    pub fn new(manager: M) -> Pool<M> {
289        Pool::builder().build(manager)
290    }
291
292    /// Returns a builder type to configure a new pool.
293    pub fn builder() -> Builder<M> {
294        Builder::new()
295    }
296
297    /// Sets the maximum number of connections managed by the pool.
298    ///
299    /// 0 means unlimited, defaults to 10.
300    pub async fn set_max_open_conns(&self, n: u64) {
301        let mut internals = self.0.internals.lock().await;
302        internals.config.max_open = n;
303        if n > 0 && internals.config.max_idle > n {
304            drop(internals);
305            self.set_max_idle_conns(n).await;
306        }
307    }
308
309    /// Sets the maximum idle connection count maintained by the pool.
310    ///
311    /// The pool will maintain at most this many idle connections
312    /// at all times, while respecting the value of `max_open`.
313    ///
314    /// Defaults to 2.
315    pub async fn set_max_idle_conns(&self, n: u64) {
316        let mut internals = self.0.internals.lock().await;
317        internals.config.max_idle =
318            if internals.config.max_open > 0 && n > internals.config.max_open {
319                internals.config.max_open
320            } else {
321                n
322            };
323
324        let max_idle = internals.config.max_idle as usize;
325        if internals.free_conns.len() > max_idle {
326            let closing = internals.free_conns.split_off(max_idle);
327            for conn in closing {
328                conn.close(&self.0.state);
329            }
330        }
331    }
332
333    /// Sets the maximum lifetime of connections in the pool.
334    ///
335    /// Expired connections may be closed lazily before reuse.
336    ///
337    /// None meas reuse forever.
338    /// Defaults to None.
339    ///
340    /// # Panics
341    ///
342    /// Panics if `max_lifetime` is the zero `Duration`.
343    pub async fn set_conn_max_lifetime(&self, max_lifetime: Option<Duration>) {
344        assert_ne!(
345            max_lifetime,
346            Some(Duration::from_secs(0)),
347            "max_lifetime must be positive"
348        );
349        let mut internals = self.0.internals.lock().await;
350        internals.config.max_lifetime = max_lifetime;
351        if let Some(lifetime) = max_lifetime {
352            match internals.config.max_lifetime {
353                Some(prev) if lifetime < prev && internals.cleaner_ch.is_some() => {
354                    // FIXME
355                    let _ = internals.cleaner_ch.as_mut().unwrap().send(()).await;
356                }
357                _ => (),
358            }
359        }
360
361        if max_lifetime.is_some()
362            && self.0.state.num_open.load(Ordering::Relaxed) > 0
363            && internals.cleaner_ch.is_none()
364        {
365            log::debug!("run connection cleaner");
366            let shared1 = Arc::downgrade(&self.0);
367            let clean_rate = self.0.config.clean_rate;
368            let (cleaner_ch_sender, cleaner_ch) = mpsc::channel(1);
369            internals.cleaner_ch = Some(cleaner_ch_sender);
370            self.0.manager.spawn_task(async move {
371                connection_cleaner(shared1, cleaner_ch, clean_rate).await;
372            });
373        }
374    }
375
376    pub(crate) fn new_inner(manager: M, config: Config) -> Self {
377        let max_open = if config.max_open == 0 {
378            CONNECTION_REQUEST_QUEUE_SIZE
379        } else {
380            config.max_open as usize
381        };
382
383        gauge!(IDLE_CONNECTIONS, max_open as f64);
384
385        let (share_config, internal_config) = config.split();
386        let internals = Mutex::new(PoolInternals {
387            config: internal_config,
388            free_conns: Vec::new(),
389            wait_duration: Duration::from_secs(0),
390            cleaner_ch: None,
391        });
392
393        let pool_state = PoolState {
394            num_open: AtomicU64::new(0),
395            max_lifetime_closed: AtomicU64::new(0),
396            wait_count: AtomicU64::new(0),
397            max_idle_closed: AtomicU64::new(0),
398        };
399
400        let shared = Arc::new(SharedPool {
401            config: share_config,
402            manager,
403            internals,
404            semaphore: Semaphore::new(max_open),
405            state: pool_state,
406        });
407
408        Pool(shared)
409    }
410
411    /// Returns a single connection by either opening a new connection
412    /// or returning an existing connection from the connection pool. Conn will
413    /// block until either a connection is returned or timeout.
414    pub async fn get(&self) -> Result<Connection<M>, Error<M::Error>> {
415        match self.0.config.get_timeout {
416            Some(duration) => self.get_timeout(duration).await,
417            None => self.inner_get_with_retries().await,
418        }
419    }
420
421    /// Retrieves a connection from the pool, waiting for at most `timeout`
422    ///
423    /// The given timeout will be used instead of the configured connection
424    /// timeout.
425    pub async fn get_timeout(&self, duration: Duration) -> Result<Connection<M>, Error<M::Error>> {
426        time::timeout(duration, self.inner_get_with_retries()).await
427    }
428
429    async fn inner_get_with_retries(&self) -> Result<Connection<M>, Error<M::Error>> {
430        let mut try_times: u32 = 0;
431        let config = &self.0.config;
432        loop {
433            try_times += 1;
434            match self.get_connection().await {
435                Ok(conn) => return Ok(conn),
436                Err(Error::BadConn) => {
437                    if try_times == config.max_bad_conn_retries {
438                        return self.get_connection().await;
439                    }
440                    continue;
441                }
442                Err(err) => return Err(err),
443            }
444        }
445    }
446
447    async fn get_connection(&self) -> Result<Connection<M>, Error<M::Error>> {
448        let mut c = self.get_or_create_conn().await?;
449        c.last_used_at = Instant::now();
450
451        let conn = Connection {
452            pool: Some(self.clone()),
453            conn: Some(c),
454        };
455
456        increment_gauge!(ACTIVE_CONNECTIONS, 1.0);
457        decrement_gauge!(WAIT_COUNT, 1.0);
458
459        Ok(conn)
460    }
461
462    async fn validate_conn(
463        &self,
464        internal_config: InternalConfig,
465        conn: &mut Conn<M::Connection, M::Error>,
466    ) -> bool {
467        if conn.brand_new {
468            return true;
469        }
470
471        if conn.expired(internal_config.max_lifetime) {
472            return false;
473        }
474
475        if conn.idle_expired(internal_config.max_idle_lifetime) {
476            return false;
477        }
478
479        let needs_health_check = self.0.config.health_check
480            && conn.needs_health_check(self.0.config.health_check_interval);
481
482        if needs_health_check {
483            let raw = conn.raw.take().unwrap();
484            match self.0.manager.check(raw).await {
485                Ok(raw) => {
486                    conn.last_checked_at = Instant::now();
487                    conn.raw = Some(raw)
488                }
489                Err(_e) => return false,
490            }
491        }
492        true
493    }
494
495    async fn get_or_create_conn(&self) -> Result<Conn<M::Connection, M::Error>, Error<M::Error>> {
496        self.0.state.wait_count.fetch_add(1, Ordering::Relaxed);
497        increment_gauge!(WAIT_COUNT, 1.0);
498        let wait_start = Instant::now();
499
500        let permit = self
501            .0
502            .semaphore
503            .acquire()
504            .await
505            .map_err(|_| Error::PoolClosed)?;
506
507        self.0.state.wait_count.fetch_sub(1, Ordering::SeqCst);
508
509        let mut internals = self.0.internals.lock().await;
510
511        internals.wait_duration += wait_start.elapsed();
512        histogram!(WAIT_DURATION, wait_start.elapsed());
513
514        let conn = internals.free_conns.pop();
515        let internal_config = internals.config.clone();
516        drop(internals);
517
518        if conn.is_some() {
519            let mut conn = conn.unwrap();
520            if self.validate_conn(internal_config, &mut conn).await {
521                decrement_gauge!(IDLE_CONNECTIONS, 1.0);
522                permit.forget();
523                return Ok(conn);
524            } else {
525                conn.close(&self.0.state);
526            }
527        }
528
529        let create_r = self.open_new_connection().await;
530
531        if create_r.is_ok() {
532            decrement_gauge!(IDLE_CONNECTIONS, 1.0);
533            permit.forget();
534        }
535
536        create_r
537    }
538
539    async fn open_new_connection(&self) -> Result<Conn<M::Connection, M::Error>, Error<M::Error>> {
540        log::debug!("creating new connection from manager");
541        match self.0.manager.connect().await {
542            Ok(c) => {
543                self.0.state.num_open.fetch_add(1, Ordering::Relaxed);
544                increment_gauge!(OPENED_TOTAL, 1.0);
545                increment_counter!(OPEN_CONNECTIONS);
546
547                let conn = Conn {
548                    raw: Some(c),
549                    last_err: Mutex::new(None),
550                    created_at: Instant::now(),
551                    last_used_at: Instant::now(),
552                    last_checked_at: Instant::now(),
553                    brand_new: true,
554                };
555
556                Ok(conn)
557            }
558            Err(e) => Err(Error::Inner(e)),
559        }
560    }
561
562    /// Returns information about the current state of the pool.
563    /// It is better to use the metrics than this method, this method
564    /// requires a lock on the internals
565    pub async fn state(&self) -> State {
566        let internals = self.0.internals.lock().await;
567        let num_free_conns = internals.free_conns.len() as u64;
568        let wait_duration = internals.wait_duration;
569        let max_open = internals.config.max_open;
570        drop(internals);
571        State {
572            max_open,
573
574            connections: self.0.state.num_open.load(Ordering::Relaxed),
575            in_use: self.0.state.num_open.load(Ordering::Relaxed) - num_free_conns,
576            idle: num_free_conns,
577
578            wait_count: self.0.state.wait_count.load(Ordering::Relaxed),
579            wait_duration,
580            max_idle_closed: self.0.state.max_idle_closed.load(Ordering::Relaxed),
581            max_lifetime_closed: self.0.state.max_lifetime_closed.load(Ordering::Relaxed),
582        }
583    }
584}
585
586async fn recycle_conn<M: Manager>(
587    shared: &Arc<SharedPool<M>>,
588    mut conn: Conn<M::Connection, M::Error>,
589) {
590    if conn_still_valid(shared, &mut conn) {
591        conn.brand_new = false;
592        let internals = shared.internals.lock().await;
593        put_idle_conn(shared, internals, conn);
594    } else {
595        conn.close(&shared.state);
596    }
597
598    shared.semaphore.add_permits(1);
599}
600
601fn conn_still_valid<M: Manager>(
602    shared: &Arc<SharedPool<M>>,
603    conn: &mut Conn<M::Connection, M::Error>,
604) -> bool {
605    if conn.raw.is_none() {
606        return false;
607    }
608
609    if !shared.manager.validate(conn.raw.as_mut().unwrap()) {
610        log::debug!("bad conn when check in");
611        return false;
612    }
613
614    true
615}
616
617fn put_idle_conn<M: Manager>(
618    shared: &Arc<SharedPool<M>>,
619    mut internals: MutexGuard<'_, PoolInternals<M::Connection, M::Error>>,
620    conn: Conn<M::Connection, M::Error>,
621) {
622    if internals.config.max_idle > internals.free_conns.len() as u64 {
623        internals.free_conns.push(conn);
624        drop(internals);
625    } else {
626        conn.close(&shared.state);
627    }
628}
629
630async fn connection_cleaner<M: Manager>(
631    shared: Weak<SharedPool<M>>,
632    mut cleaner_ch: Receiver<()>,
633    clean_rate: Duration,
634) {
635    let mut interval = interval(clean_rate);
636    interval.tick().await;
637    loop {
638        select! {
639            _ = interval.tick().fuse() => (),
640            r = cleaner_ch.next().fuse() => match r{
641                Some(()) => (),
642                None=> return
643            },
644        }
645
646        if !clean_connection(&shared).await {
647            return;
648        }
649    }
650}
651
652async fn clean_connection<M: Manager>(shared: &Weak<SharedPool<M>>) -> bool {
653    let shared = match shared.upgrade() {
654        Some(shared) => shared,
655        None => {
656            log::debug!("Failed to clean connections");
657            return false;
658        }
659    };
660
661    log::debug!("Clean connections");
662
663    let mut internals = shared.internals.lock().await;
664    if shared.state.num_open.load(Ordering::Relaxed) == 0 || internals.config.max_lifetime.is_none()
665    {
666        internals.cleaner_ch.take();
667        return false;
668    }
669
670    let expired = Instant::now() - internals.config.max_lifetime.unwrap();
671    let mut closing = vec![];
672
673    let mut i = 0;
674    log::debug!(
675        "clean connections, idle conns {}",
676        internals.free_conns.len()
677    );
678
679    loop {
680        if i >= internals.free_conns.len() {
681            break;
682        }
683
684        if internals.free_conns[i].created_at < expired {
685            let c = internals.free_conns.swap_remove(i);
686            closing.push(c);
687            continue;
688        }
689        i += 1;
690    }
691    drop(internals);
692
693    shared
694        .state
695        .max_lifetime_closed
696        .fetch_add(closing.len() as u64, Ordering::Relaxed);
697    for conn in closing {
698        conn.close(&shared.state);
699    }
700    true
701}
702
703/// A smart pointer wrapping a connection.
704pub struct Connection<M: Manager> {
705    pool: Option<Pool<M>>,
706    conn: Option<Conn<M::Connection, M::Error>>,
707}
708
709impl<M: Manager> Connection<M> {
710    /// Returns true is the connection is newly established.
711    pub fn is_brand_new(&self) -> bool {
712        self.conn.as_ref().unwrap().brand_new
713    }
714
715    /// Unwraps the raw database connection.
716    pub fn into_inner(mut self) -> M::Connection {
717        self.conn.as_mut().unwrap().raw.take().unwrap()
718    }
719}
720
721impl<M: Manager> Drop for Connection<M> {
722    fn drop(&mut self) {
723        let pool = self.pool.take().unwrap();
724        let conn = self.conn.take().unwrap();
725
726        decrement_gauge!(ACTIVE_CONNECTIONS, 1.0);
727        increment_gauge!(IDLE_CONNECTIONS, 1.0);
728        // FIXME: No clone!
729        pool.clone().0.manager.spawn_task(async move {
730            recycle_conn(&pool.0, conn).await;
731        });
732    }
733}
734
735impl<M: Manager> Deref for Connection<M> {
736    type Target = M::Connection;
737    fn deref(&self) -> &Self::Target {
738        self.conn.as_ref().unwrap().raw.as_ref().unwrap()
739    }
740}
741
742impl<M: Manager> DerefMut for Connection<M> {
743    fn deref_mut(&mut self) -> &mut M::Connection {
744        self.conn.as_mut().unwrap().raw.as_mut().unwrap()
745    }
746}