postro/
pool.rs

1//! Database connection pooling.
2use crate::{Connection, Result, executor::Executor, transport::PgTransport};
3
4mod config;
5
6#[cfg(feature = "tokio")]
7mod worker;
8
9pub use config::PoolConfig;
10
11/// Database connection pool.
12#[derive(Debug)]
13#[clippy::has_significant_drop]
14pub struct Pool {
15    conn: Option<Connection>,
16    #[cfg(feature = "tokio")]
17    handle: worker::WorkerHandle,
18    #[cfg(not(feature = "tokio"))]
19    handle: mock_handle::WorkerHandle,
20}
21
22impl Drop for Pool {
23    fn drop(&mut self) {
24        if let Some(conn) = self.conn.take() {
25            self.handle.release(conn);
26        }
27    }
28}
29
30impl Clone for Pool {
31    fn clone(&self) -> Self {
32        Self {
33            conn: None,
34            handle: self.handle.clone(),
35        }
36    }
37}
38
39impl Pool {
40    /// Create [`Pool`] and try to create one connection.
41    pub async fn connect(url: &str) -> Result<Self> {
42        PoolConfig::from_env().connect(url).await
43    }
44
45    /// Create [`Pool`] without trying to create connection.
46    pub fn connect_lazy(url: &str) -> Result<Self> {
47        PoolConfig::from_env().connect_lazy(url)
48    }
49
50    /// Create [`Pool`] and try to create one connection.
51    ///
52    /// See [`Config::from_env`][1] for more details on env.
53    ///
54    /// [1]: crate::Config::from_env
55    pub async fn connect_env() -> Result<Pool> {
56        Self::connect_with(PoolConfig::from_env()).await
57    }
58
59    /// Create [`Pool`] and try to create one connection.
60    pub async fn connect_with(config: PoolConfig) -> Result<Self> {
61        #[cfg(feature = "tokio")]
62        {
63            let (handle,worker) = worker::WorkerHandle::new(config);
64            tokio::spawn(worker);
65            Ok(Self { conn: None, handle })
66        }
67
68        #[cfg(not(feature = "tokio"))]
69        {
70            let _ = config;
71            panic!("runtime disabled")
72        }
73    }
74
75    /// Create [`Pool`] without trying to create connection.
76    pub fn connect_lazy_with(config: PoolConfig) -> Self {
77        #[cfg(feature = "tokio")]
78        {
79            let (handle,worker) = worker::WorkerHandle::new(config);
80            tokio::spawn(worker);
81            Self { conn: None, handle }
82        }
83
84        #[cfg(not(feature = "tokio"))]
85        {
86            let _ = config;
87            panic!("runtime disabled")
88        }
89    }
90
91    fn poll_connection(&mut self, cx: &mut std::task::Context) -> std::task::Poll<Result<Connection>> {
92        self.handle.poll_acquire(cx)
93    }
94}
95
96impl Executor for Pool {
97    type Transport = PoolConnection<'static>;
98
99    type Future = PoolConnect<'static>;
100
101    fn connection(self) -> Self::Future {
102        PoolConnect { pool: Some(PoolCow::Owned(self)) }
103    }
104}
105
106impl Executor for &Pool {
107    type Transport = PoolConnection<'static>;
108
109    type Future = PoolConnect<'static>;
110
111    fn connection(self) -> Self::Future {
112        PoolConnect { pool: Some(PoolCow::Owned(self.clone())) }
113    }
114}
115
116impl<'a> Executor for &'a mut Pool {
117    type Transport = PoolConnection<'a>;
118
119    type Future = PoolConnect<'a>;
120
121    fn connection(self) -> Self::Future {
122        PoolConnect { pool: Some(PoolCow::Borrow(self)) }
123    }
124}
125
126/// Future returned from [`Pool`] implementation of [`Executor::connection`].
127#[derive(Debug)]
128pub struct PoolConnect<'a> {
129    pool: Option<PoolCow<'a>>,
130}
131
132impl<'a> Future for PoolConnect<'a> {
133    type Output = Result<PoolConnection<'a>>;
134
135    fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
136        use std::task::Poll::*;
137        if let Some(conn) = self.pool.as_mut().unwrap().as_mut().conn.take() {
138            return Ready(Ok(PoolConnection { conn: Some(conn), pool: self.pool.take().unwrap() }))
139        }
140        let conn = std::task::ready!(self.pool.as_mut().unwrap().as_mut().poll_connection(cx)?);
141        crate::common::verbose!(target: "pool_handle", "pool connection checkout");
142        Ready(Ok(PoolConnection { conn: Some(conn), pool: self.pool.take().unwrap() }))
143    }
144}
145
146/// Instance of [`Pool`] with the checked out connection.
147#[derive(Debug)]
148pub struct PoolConnection<'a> {
149    pool: PoolCow<'a>,
150    conn: Option<Connection>,
151}
152
153#[derive(Debug)]
154enum PoolCow<'a> {
155    Borrow(&'a mut Pool),
156    Owned(Pool),
157}
158
159impl PoolCow<'_> {
160    fn as_ref(&self) -> &Pool {
161        match self {
162            PoolCow::Borrow(pool) => pool,
163            PoolCow::Owned(pool) => pool,
164        }
165    }
166
167    fn as_mut(&mut self) -> &mut Pool {
168        match self {
169            PoolCow::Borrow(pool) => pool,
170            PoolCow::Owned(pool) => pool,
171        }
172    }
173}
174
175impl PoolConnection<'_> {
176    /// Returns the [`Pool`] handle.
177    pub fn pool(&self) -> &Pool {
178        self.pool.as_ref()
179    }
180
181    /// Returns the underlying [`Connection`].
182    pub fn connection(&mut self) -> &mut Connection {
183        // `conn` only `None` on drop
184        self.conn.as_mut().unwrap()
185    }
186}
187
188impl Drop for PoolConnection<'_> {
189    fn drop(&mut self) {
190        self.pool.as_mut().conn = self.conn.take();
191    }
192}
193
194impl PgTransport for PoolConnection<'_> {
195    fn poll_flush(&mut self, cx: &mut std::task::Context) -> std::task::Poll<std::io::Result<()>> {
196        self.connection().poll_flush(cx)
197    }
198
199    fn poll_recv<B: crate::postgres::BackendProtocol>(&mut self, cx: &mut std::task::Context) -> std::task::Poll<Result<B>> {
200        self.connection().poll_recv(cx)
201    }
202
203    fn ready_request(&mut self) {
204        self.connection().ready_request();
205    }
206
207    fn send<F: crate::postgres::FrontendProtocol>(&mut self, message: F) {
208        self.connection().send(message);
209    }
210
211    fn send_startup(&mut self, startup: crate::postgres::frontend::Startup) {
212        self.connection().send_startup(startup);
213    }
214
215    fn get_stmt(&mut self, sql: u64) -> Option<crate::statement::StatementName> {
216        self.connection().get_stmt(sql)
217    }
218
219    fn add_stmt(&mut self, sql: u64, id: crate::statement::StatementName) {
220        self.connection().add_stmt(sql, id);
221    }
222}
223
224#[cfg(not(feature = "tokio"))]
225mod mock_handle {
226    use std::task::{Context, Poll};
227
228    use crate::{Connection, Result};
229
230    #[derive(Debug, Clone)]
231    pub struct WorkerHandle;
232
233    impl WorkerHandle {
234        pub fn poll_acquire(&mut self, _: &mut Context) -> Poll<Result<Connection>> {
235            unreachable!()
236        }
237
238        pub fn release(&self, _: Connection) {
239            unreachable!()
240        }
241    }
242}
243