zero_postgres/sync/
pool.rs

1use std::mem::ManuallyDrop;
2use std::ops::{Deref, DerefMut};
3use std::sync::Arc;
4
5use crossbeam_queue::ArrayQueue;
6use std_semaphore::Semaphore;
7
8use crate::error::Result;
9use crate::opts::Opts;
10
11use super::Conn;
12
13pub struct Pool {
14    opts: Opts,
15    conns: ArrayQueue<Conn>,
16    semaphore: Option<Semaphore>,
17}
18
19impl Pool {
20    pub fn new(opts: Opts) -> Self {
21        let semaphore = opts
22            .pool_max_concurrency
23            .map(|n| Semaphore::new(n as isize));
24        Self {
25            conns: ArrayQueue::new(opts.pool_max_idle_conn),
26            opts,
27            semaphore,
28        }
29    }
30
31    pub fn get(self: &Arc<Self>) -> Result<PooledConn> {
32        if let Some(sem) = &self.semaphore {
33            sem.acquire();
34        }
35        let conn = loop {
36            match self.conns.pop() {
37                Some(mut c) => {
38                    if c.ping().is_ok() {
39                        break c;
40                    }
41                    // Connection dead, try next one
42                }
43                None => break Conn::new(self.opts.clone())?,
44            }
45        };
46        Ok(PooledConn {
47            conn: ManuallyDrop::new(conn),
48            pool: Arc::clone(self),
49        })
50    }
51
52    fn check_in(&self, mut conn: Conn) {
53        if conn.is_broken() {
54            return;
55        }
56        if conn.query_drop("DISCARD ALL").is_err() {
57            return;
58        }
59        let _ = self.conns.push(conn);
60    }
61}
62
63pub struct PooledConn {
64    pool: Arc<Pool>,
65    conn: ManuallyDrop<Conn>,
66}
67
68impl Deref for PooledConn {
69    type Target = Conn;
70    fn deref(&self) -> &Self::Target {
71        &self.conn
72    }
73}
74
75impl DerefMut for PooledConn {
76    fn deref_mut(&mut self) -> &mut Self::Target {
77        &mut self.conn
78    }
79}
80
81impl Drop for PooledConn {
82    fn drop(&mut self) {
83        // SAFETY: conn is never accessed after this
84        let conn = unsafe { ManuallyDrop::take(&mut self.conn) };
85        self.pool.check_in(conn);
86        if let Some(sem) = &self.pool.semaphore {
87            sem.release();
88        }
89    }
90}