Skip to main content

zero_postgres/tokio/
pool.rs

1//! Asynchronous connection pool.
2
3use std::mem::ManuallyDrop;
4use std::ops::{Deref, DerefMut};
5use std::sync::Arc;
6
7use crossbeam_queue::ArrayQueue;
8use tokio::sync::Semaphore;
9
10use crate::error::{Error, Result};
11use crate::opts::Opts;
12
13use super::Conn;
14
15pub struct Pool {
16    opts: Opts,
17    conns: ArrayQueue<Conn>,
18    semaphore: Option<Arc<Semaphore>>,
19}
20
21impl Pool {
22    pub fn new(opts: Opts) -> Self {
23        let semaphore = opts
24            .pool_max_concurrency
25            .map(|n| Arc::new(Semaphore::new(n)));
26        Self {
27            conns: ArrayQueue::new(opts.pool_max_idle_conn),
28            opts,
29            semaphore,
30        }
31    }
32
33    pub async fn get(self: &Arc<Self>) -> Result<PooledConn> {
34        let permit = if let Some(sem) = &self.semaphore {
35            Some(
36                Arc::clone(sem)
37                    .acquire_owned()
38                    .await
39                    .map_err(|_unhelpful_err| {
40                        Error::LibraryBug("pool semaphore is closed".into())
41                    })?,
42            )
43        } else {
44            None
45        };
46        let conn = loop {
47            match self.conns.pop() {
48                Some(mut c) => {
49                    if c.ping().await.is_ok() {
50                        break c;
51                    }
52                    // Connection dead, try next one
53                }
54                None => break Conn::new(self.opts.clone()).await?,
55            }
56        };
57        Ok(PooledConn {
58            conn: ManuallyDrop::new(conn),
59            pool: Arc::clone(self),
60            _permit: permit,
61        })
62    }
63
64    async fn check_in(&self, mut conn: Conn) {
65        if conn.is_broken() {
66            return;
67        }
68        if conn.query_drop("DISCARD ALL").await.is_err() {
69            return;
70        }
71        let _ = self.conns.push(conn);
72    }
73}
74
75pub struct PooledConn {
76    pool: Arc<Pool>,
77    conn: ManuallyDrop<Conn>,
78    _permit: Option<tokio::sync::OwnedSemaphorePermit>,
79}
80
81impl Deref for PooledConn {
82    type Target = Conn;
83    fn deref(&self) -> &Self::Target {
84        &self.conn
85    }
86}
87
88impl DerefMut for PooledConn {
89    fn deref_mut(&mut self) -> &mut Self::Target {
90        &mut self.conn
91    }
92}
93
94impl Drop for PooledConn {
95    fn drop(&mut self) {
96        // SAFETY: conn is never accessed after this
97        let conn = unsafe { ManuallyDrop::take(&mut self.conn) };
98        let pool = Arc::clone(&self.pool);
99        tokio::spawn(async move {
100            pool.check_in(conn).await;
101        });
102    }
103}