zero_postgres/sync/
pool.rs1use std::mem::ManuallyDrop;
2use std::ops::{Deref, DerefMut};
3use std::sync::Arc;
4
5use crossbeam_queue::ArrayQueue;
6use std_semaphore::Semaphore;
7
8use crate::error::Result;
9use crate::opts::Opts;
10
11use super::Conn;
12
13pub struct Pool {
14 opts: Opts,
15 conns: ArrayQueue<Conn>,
16 semaphore: Option<Semaphore>,
17}
18
19impl Pool {
20 pub fn new(opts: Opts) -> Self {
21 let semaphore = opts
22 .pool_max_concurrency
23 .map(|n| Semaphore::new(n as isize));
24 Self {
25 conns: ArrayQueue::new(opts.pool_max_idle_conn),
26 opts,
27 semaphore,
28 }
29 }
30
31 pub fn get(self: &Arc<Self>) -> Result<PooledConn> {
32 if let Some(sem) = &self.semaphore {
33 sem.acquire();
34 }
35 let conn = loop {
36 match self.conns.pop() {
37 Some(mut c) => {
38 if c.ping().is_ok() {
39 break c;
40 }
41 }
43 None => break Conn::new(self.opts.clone())?,
44 }
45 };
46 Ok(PooledConn {
47 conn: ManuallyDrop::new(conn),
48 pool: Arc::clone(self),
49 })
50 }
51
52 fn check_in(&self, mut conn: Conn) {
53 if conn.is_broken() {
54 return;
55 }
56 if conn.query_drop("DISCARD ALL").is_err() {
57 return;
58 }
59 let _ = self.conns.push(conn);
60 }
61}
62
63pub struct PooledConn {
64 pool: Arc<Pool>,
65 conn: ManuallyDrop<Conn>,
66}
67
68impl Deref for PooledConn {
69 type Target = Conn;
70 fn deref(&self) -> &Self::Target {
71 &self.conn
72 }
73}
74
75impl DerefMut for PooledConn {
76 fn deref_mut(&mut self) -> &mut Self::Target {
77 &mut self.conn
78 }
79}
80
81impl Drop for PooledConn {
82 fn drop(&mut self) {
83 let conn = unsafe { ManuallyDrop::take(&mut self.conn) };
85 self.pool.check_in(conn);
86 if let Some(sem) = &self.pool.semaphore {
87 sem.release();
88 }
89 }
90}