1use std::mem::ManuallyDrop;
2use std::ops::{Deref, DerefMut};
3use std::sync::Arc;
4
5use crossbeam_queue::ArrayQueue;
6use std_semaphore::Semaphore;
7
8use crate::error::Result;
9use crate::opts::Opts;
10
11use super::Conn;
12
13pub struct Pool {
14 opts: Opts,
15 conns: ArrayQueue<Conn>,
16 semaphore: Option<Semaphore>,
17}
18
19impl Pool {
20 pub fn new(opts: Opts) -> Self {
21 let semaphore = opts
22 .pool_max_concurrency
23 .map(|n| Semaphore::new(n as isize));
24 Self {
25 conns: ArrayQueue::new(opts.pool_max_idle_conn),
26 opts,
27 semaphore,
28 }
29 }
30
31 pub fn get(self: &Arc<Self>) -> Result<PooledConn> {
32 if let Some(sem) = &self.semaphore {
33 sem.acquire();
34 }
35 let mut conn = match self.conns.pop() {
36 Some(c) => c,
37 None => Conn::new(self.opts.clone())?,
38 };
39 conn.ping()?;
40 Ok(PooledConn {
41 conn: ManuallyDrop::new(conn),
42 pool: Arc::clone(self),
43 })
44 }
45
46 fn check_in(&self, mut conn: Conn) {
47 if conn.is_broken() {
48 return;
49 }
50 if self.opts.pool_reset_conn && conn.reset().is_err() {
51 return;
52 }
53 let _ = self.conns.push(conn);
54 }
55}
56
57pub struct PooledConn {
58 pool: Arc<Pool>,
59 conn: ManuallyDrop<Conn>,
60}
61
62impl Deref for PooledConn {
63 type Target = Conn;
64 fn deref(&self) -> &Self::Target {
65 &self.conn
66 }
67}
68
69impl DerefMut for PooledConn {
70 fn deref_mut(&mut self) -> &mut Self::Target {
71 &mut self.conn
72 }
73}
74
75impl Drop for PooledConn {
76 fn drop(&mut self) {
77 let conn = unsafe { ManuallyDrop::take(&mut self.conn) };
79 self.pool.check_in(conn);
80 if let Some(sem) = &self.pool.semaphore {
81 sem.release();
82 }
83 }
84}