zero_postgres/tokio/
pool.rs1use std::mem::ManuallyDrop;
4use std::ops::{Deref, DerefMut};
5use std::sync::Arc;
6
7use crossbeam_queue::ArrayQueue;
8use tokio::sync::Semaphore;
9
10use crate::error::{Error, Result};
11use crate::opts::Opts;
12
13use super::Conn;
14
15pub struct Pool {
16 opts: Opts,
17 conns: ArrayQueue<Conn>,
18 semaphore: Option<Arc<Semaphore>>,
19}
20
21impl Pool {
22 pub fn new(opts: Opts) -> Self {
23 let semaphore = opts
24 .pool_max_concurrency
25 .map(|n| Arc::new(Semaphore::new(n)));
26 Self {
27 conns: ArrayQueue::new(opts.pool_max_idle_conn),
28 opts,
29 semaphore,
30 }
31 }
32
33 pub async fn get(self: &Arc<Self>) -> Result<PooledConn> {
34 let permit = if let Some(sem) = &self.semaphore {
35 Some(
36 Arc::clone(sem)
37 .acquire_owned()
38 .await
39 .map_err(|_unhelpful_err| {
40 Error::LibraryBug("pool semaphore is closed".into())
41 })?,
42 )
43 } else {
44 None
45 };
46 let conn = loop {
47 match self.conns.pop() {
48 Some(mut c) => {
49 if c.ping().await.is_ok() {
50 break c;
51 }
52 }
54 None => break Conn::new(self.opts.clone()).await?,
55 }
56 };
57 Ok(PooledConn {
58 conn: ManuallyDrop::new(conn),
59 pool: Arc::clone(self),
60 _permit: permit,
61 })
62 }
63
64 async fn check_in(&self, mut conn: Conn) {
65 if conn.is_broken() {
66 return;
67 }
68 if conn.query_drop("DISCARD ALL").await.is_err() {
69 return;
70 }
71 let _ = self.conns.push(conn);
72 }
73}
74
75pub struct PooledConn {
76 pool: Arc<Pool>,
77 conn: ManuallyDrop<Conn>,
78 _permit: Option<tokio::sync::OwnedSemaphorePermit>,
79}
80
81impl Deref for PooledConn {
82 type Target = Conn;
83 fn deref(&self) -> &Self::Target {
84 &self.conn
85 }
86}
87
88impl DerefMut for PooledConn {
89 fn deref_mut(&mut self) -> &mut Self::Target {
90 &mut self.conn
91 }
92}
93
94impl Drop for PooledConn {
95 fn drop(&mut self) {
96 let conn = unsafe { ManuallyDrop::take(&mut self.conn) };
98 let pool = Arc::clone(&self.pool);
99 tokio::spawn(async move {
100 pool.check_in(conn).await;
101 });
102 }
103}