zero_postgres/tokio/
pool.rs1use std::mem::ManuallyDrop;
4use std::ops::{Deref, DerefMut};
5use std::sync::Arc;
6
7use crossbeam_queue::ArrayQueue;
8use tokio::sync::Semaphore;
9
10use crate::error::Result;
11use crate::opts::Opts;
12
13use super::Conn;
14
15pub struct Pool {
16 opts: Opts,
17 conns: ArrayQueue<Conn>,
18 semaphore: Option<Arc<Semaphore>>,
19}
20
21impl Pool {
22 pub fn new(opts: Opts) -> Self {
23 let semaphore = opts
24 .pool_max_concurrency
25 .map(|n| Arc::new(Semaphore::new(n)));
26 Self {
27 conns: ArrayQueue::new(opts.pool_max_idle_conn),
28 opts,
29 semaphore,
30 }
31 }
32
33 pub async fn get(self: &Arc<Self>) -> Result<PooledConn> {
34 let permit = if let Some(sem) = &self.semaphore {
35 Some(Arc::clone(sem).acquire_owned().await.unwrap())
36 } else {
37 None
38 };
39 let conn = loop {
40 match self.conns.pop() {
41 Some(mut c) => {
42 if c.ping().await.is_ok() {
43 break c;
44 }
45 }
47 None => break Conn::new(self.opts.clone()).await?,
48 }
49 };
50 Ok(PooledConn {
51 conn: ManuallyDrop::new(conn),
52 pool: Arc::clone(self),
53 _permit: permit,
54 })
55 }
56
57 async fn check_in(&self, mut conn: Conn) {
58 if conn.is_broken() {
59 return;
60 }
61 if conn.query_drop("DISCARD ALL").await.is_err() {
62 return;
63 }
64 let _ = self.conns.push(conn);
65 }
66}
67
68pub struct PooledConn {
69 pool: Arc<Pool>,
70 conn: ManuallyDrop<Conn>,
71 _permit: Option<tokio::sync::OwnedSemaphorePermit>,
72}
73
74impl Deref for PooledConn {
75 type Target = Conn;
76 fn deref(&self) -> &Self::Target {
77 &self.conn
78 }
79}
80
81impl DerefMut for PooledConn {
82 fn deref_mut(&mut self) -> &mut Self::Target {
83 &mut self.conn
84 }
85}
86
87impl Drop for PooledConn {
88 fn drop(&mut self) {
89 let conn = unsafe { ManuallyDrop::take(&mut self.conn) };
91 let pool = Arc::clone(&self.pool);
92 tokio::spawn(async move {
93 pool.check_in(conn).await;
94 });
95 }
96}