zero_postgres/tokio/
pool.rs

1//! Asynchronous connection pool.
2
3use std::mem::ManuallyDrop;
4use std::ops::{Deref, DerefMut};
5use std::sync::Arc;
6
7use crossbeam_queue::ArrayQueue;
8use tokio::sync::Semaphore;
9
10use crate::error::Result;
11use crate::opts::Opts;
12
13use super::Conn;
14
15pub struct Pool {
16    opts: Opts,
17    conns: ArrayQueue<Conn>,
18    semaphore: Option<Arc<Semaphore>>,
19}
20
21impl Pool {
22    pub fn new(opts: Opts) -> Self {
23        let semaphore = opts
24            .pool_max_concurrency
25            .map(|n| Arc::new(Semaphore::new(n)));
26        Self {
27            conns: ArrayQueue::new(opts.pool_max_idle_conn),
28            opts,
29            semaphore,
30        }
31    }
32
33    pub async fn get(self: &Arc<Self>) -> Result<PooledConn> {
34        let permit = if let Some(sem) = &self.semaphore {
35            Some(Arc::clone(sem).acquire_owned().await.unwrap())
36        } else {
37            None
38        };
39        let conn = loop {
40            match self.conns.pop() {
41                Some(mut c) => {
42                    if c.ping().await.is_ok() {
43                        break c;
44                    }
45                    // Connection dead, try next one
46                }
47                None => break Conn::new(self.opts.clone()).await?,
48            }
49        };
50        Ok(PooledConn {
51            conn: ManuallyDrop::new(conn),
52            pool: Arc::clone(self),
53            _permit: permit,
54        })
55    }
56
57    async fn check_in(&self, mut conn: Conn) {
58        if conn.is_broken() {
59            return;
60        }
61        if conn.query_drop("DISCARD ALL").await.is_err() {
62            return;
63        }
64        let _ = self.conns.push(conn);
65    }
66}
67
68pub struct PooledConn {
69    pool: Arc<Pool>,
70    conn: ManuallyDrop<Conn>,
71    _permit: Option<tokio::sync::OwnedSemaphorePermit>,
72}
73
74impl Deref for PooledConn {
75    type Target = Conn;
76    fn deref(&self) -> &Self::Target {
77        &self.conn
78    }
79}
80
81impl DerefMut for PooledConn {
82    fn deref_mut(&mut self) -> &mut Self::Target {
83        &mut self.conn
84    }
85}
86
87impl Drop for PooledConn {
88    fn drop(&mut self) {
89        // SAFETY: conn is never accessed after this
90        let conn = unsafe { ManuallyDrop::take(&mut self.conn) };
91        let pool = Arc::clone(&self.pool);
92        tokio::spawn(async move {
93            pool.check_in(conn).await;
94        });
95    }
96}