sqlx_core_oldapi/pool/
connection.rs

1use std::fmt::{self, Debug, Formatter};
2use std::ops::{Deref, DerefMut};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use futures_intrusive::sync::SemaphoreReleaser;
7
8use crate::connection::Connection;
9use crate::database::Database;
10use crate::error::Error;
11
12use super::inner::{DecrementSizeGuard, PoolInner};
13use crate::pool::options::PoolConnectionMetadata;
14use std::future::Future;
15
16/// A connection managed by a [`Pool`][crate::pool::Pool].
17///
18/// Will be returned to the pool on-drop.
19pub struct PoolConnection<DB: Database> {
20    live: Option<Live<DB>>,
21    pub(crate) pool: Arc<PoolInner<DB>>,
22}
23
24pub(super) struct Live<DB: Database> {
25    pub(super) raw: DB::Connection,
26    pub(super) created_at: Instant,
27}
28
29pub(super) struct Idle<DB: Database> {
30    pub(super) live: Live<DB>,
31    pub(super) idle_since: Instant,
32}
33
34/// RAII wrapper for connections being handled by functions that may drop them
35pub(super) struct Floating<DB: Database, C> {
36    pub(super) inner: C,
37    pub(super) guard: DecrementSizeGuard<DB>,
38}
39
40const EXPECT_MSG: &str = "BUG: inner connection already taken!";
41
42impl<DB: Database> Debug for PoolConnection<DB> {
43    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
44        // TODO: Show the type name of the connection ?
45        f.debug_struct("PoolConnection").finish()
46    }
47}
48
49impl<DB: Database> Deref for PoolConnection<DB> {
50    type Target = DB::Connection;
51
52    fn deref(&self) -> &Self::Target {
53        &self.live.as_ref().expect(EXPECT_MSG).raw
54    }
55}
56
57impl<DB: Database> DerefMut for PoolConnection<DB> {
58    fn deref_mut(&mut self) -> &mut Self::Target {
59        &mut self.live.as_mut().expect(EXPECT_MSG).raw
60    }
61}
62
63impl<DB: Database> AsRef<DB::Connection> for PoolConnection<DB> {
64    fn as_ref(&self) -> &DB::Connection {
65        self
66    }
67}
68
69impl<DB: Database> AsMut<DB::Connection> for PoolConnection<DB> {
70    fn as_mut(&mut self) -> &mut DB::Connection {
71        self
72    }
73}
74
75impl<DB: Database> PoolConnection<DB> {
76    /// Detach this connection from the pool, allowing it to open a replacement.
77    ///
78    /// Note that if your application uses a single shared pool, this
79    /// effectively lets the application exceed the [`max_connections`] setting.
80    ///
81    /// If [`min_connections`] is nonzero, a task will be spawned to replace this connection.
82    ///
83    /// If you want the pool to treat this connection as permanently checked-out,
84    /// use [`.leak()`][Self::leak] instead.
85    ///
86    /// [`max_connections`]: crate::pool::PoolOptions::max_connections
87    /// [`min_connections`]: crate::pool::PoolOptions::min_connections
88    pub fn detach(mut self) -> DB::Connection {
89        self.take_live().float(self.pool.clone()).detach()
90    }
91
92    /// Detach this connection from the pool, treating it as permanently checked-out.
93    ///
94    /// This effectively will reduce the maximum capacity of the pool by 1 every time it is used.
95    ///
96    /// If you don't want to impact the pool's capacity, use [`.detach()`][Self::detach] instead.
97    pub fn leak(mut self) -> DB::Connection {
98        self.take_live().raw
99    }
100
101    fn take_live(&mut self) -> Live<DB> {
102        self.live.take().expect(EXPECT_MSG)
103    }
104
105    /// Test the connection to make sure it is still live before returning it to the pool.
106    ///
107    /// This effectively runs the drop handler eagerly instead of spawning a task to do it.
108    pub(crate) fn return_to_pool(&mut self) -> impl Future<Output = ()> + Send + 'static {
109        // float the connection in the pool before we move into the task
110        // in case the returned `Future` isn't executed, like if it's spawned into a dying runtime
111        // https://github.com/launchbadge/sqlx/issues/1396
112        // Type hints seem to be broken by `Option` combinators in IntelliJ Rust right now (6/22).
113        let floating: Option<Floating<DB, Live<DB>>> =
114            self.live.take().map(|live| live.float(self.pool.clone()));
115
116        let pool = self.pool.clone();
117
118        async move {
119            let returned_to_pool = if let Some(floating) = floating {
120                floating.return_to_pool().await
121            } else {
122                false
123            };
124
125            if !returned_to_pool {
126                pool.min_connections_maintenance(None).await;
127            }
128        }
129    }
130}
131
132/// Returns the connection to the [`Pool`][crate::pool::Pool] it was checked-out from.
133impl<DB: Database> Drop for PoolConnection<DB> {
134    fn drop(&mut self) {
135        // We still need to spawn a task to maintain `min_connections`.
136        if self.live.is_some() || self.pool.options.min_connections > 0 {
137            #[cfg(not(feature = "_rt-async-std"))]
138            if let Ok(handle) = sqlx_rt::Handle::try_current() {
139                handle.spawn(self.return_to_pool());
140            }
141
142            #[cfg(feature = "_rt-async-std")]
143            sqlx_rt::spawn(self.return_to_pool());
144        }
145    }
146}
147
148impl<DB: Database> Live<DB> {
149    pub fn float(self, pool: Arc<PoolInner<DB>>) -> Floating<DB, Self> {
150        Floating {
151            inner: self,
152            // create a new guard from a previously leaked permit
153            guard: DecrementSizeGuard::new_permit(pool),
154        }
155    }
156
157    pub fn into_idle(self) -> Idle<DB> {
158        Idle {
159            live: self,
160            idle_since: Instant::now(),
161        }
162    }
163}
164
165impl<DB: Database> Deref for Idle<DB> {
166    type Target = Live<DB>;
167
168    fn deref(&self) -> &Self::Target {
169        &self.live
170    }
171}
172
173impl<DB: Database> DerefMut for Idle<DB> {
174    fn deref_mut(&mut self) -> &mut Self::Target {
175        &mut self.live
176    }
177}
178
179impl<DB: Database> Floating<DB, Live<DB>> {
180    pub fn new_live(conn: DB::Connection, guard: DecrementSizeGuard<DB>) -> Self {
181        Self {
182            inner: Live {
183                raw: conn,
184                created_at: Instant::now(),
185            },
186            guard,
187        }
188    }
189
190    pub fn reattach(self) -> PoolConnection<DB> {
191        let Floating { inner, guard } = self;
192
193        let pool = Arc::clone(&guard.pool);
194
195        guard.cancel();
196        PoolConnection {
197            live: Some(inner),
198            pool,
199        }
200    }
201
202    pub fn release(self) {
203        self.guard.pool.clone().release(self);
204    }
205
206    /// Return the connection to the pool.
207    ///
208    /// Returns `true` if the connection was successfully returned, `false` if it was closed.
209    async fn return_to_pool(mut self) -> bool {
210        // Immediately close the connection.
211        if self.guard.pool.is_closed() {
212            self.close().await;
213            return false;
214        }
215
216        if let Some(test) = &self.guard.pool.options.after_release {
217            let meta = self.metadata();
218            match (test)(&mut self.inner.raw, meta).await {
219                Ok(true) => (),
220                Ok(false) => {
221                    self.close().await;
222                    return false;
223                }
224                Err(e) => {
225                    log::warn!("error from after_release: {}", e);
226                    // Connection is broken, don't try to gracefully close as
227                    // something weird might happen.
228                    self.close_hard().await;
229                    return false;
230                }
231            }
232        }
233
234        // test the connection on-release to ensure it is still viable,
235        // and flush anything time-sensitive like transaction rollbacks
236        // if an Executor future/stream is dropped during an `.await` call, the connection
237        // is likely to be left in an inconsistent state, in which case it should not be
238        // returned to the pool; also of course, if it was dropped due to an error
239        // this is simply a band-aid as SQLx-next connections should be able
240        // to recover from cancellations
241        if let Err(e) = self.raw.ping().await {
242            log::warn!(
243                "error occurred while testing the connection on-release: {}",
244                e
245            );
246
247            // Connection is broken, don't try to gracefully close.
248            self.close_hard().await;
249            false
250        } else {
251            // if the connection is still viable, release it to the pool
252            self.release();
253            true
254        }
255    }
256
257    pub async fn close(self) {
258        // This isn't used anywhere that we care about the return value
259        let _ = self.inner.raw.close().await;
260
261        // `guard` is dropped as intended
262    }
263
264    pub async fn close_hard(self) {
265        let _ = self.inner.raw.close_hard().await;
266    }
267
268    pub fn detach(self) -> DB::Connection {
269        self.inner.raw
270    }
271
272    pub fn into_idle(self) -> Floating<DB, Idle<DB>> {
273        Floating {
274            inner: self.inner.into_idle(),
275            guard: self.guard,
276        }
277    }
278
279    pub fn metadata(&self) -> PoolConnectionMetadata {
280        PoolConnectionMetadata {
281            age: self.created_at.elapsed(),
282            idle_for: Duration::ZERO,
283        }
284    }
285}
286
287impl<DB: Database> Floating<DB, Idle<DB>> {
288    pub fn from_idle(
289        idle: Idle<DB>,
290        pool: Arc<PoolInner<DB>>,
291        permit: SemaphoreReleaser<'_>,
292    ) -> Self {
293        Self {
294            inner: idle,
295            guard: DecrementSizeGuard::from_permit(pool, permit),
296        }
297    }
298
299    pub async fn ping(&mut self) -> Result<(), Error> {
300        self.live.raw.ping().await
301    }
302
303    pub fn into_live(self) -> Floating<DB, Live<DB>> {
304        Floating {
305            inner: self.inner.live,
306            guard: self.guard,
307        }
308    }
309
310    pub async fn close(self) -> DecrementSizeGuard<DB> {
311        if let Err(e) = self.inner.live.raw.close().await {
312            log::debug!("error occurred while closing the pool connection: {}", e);
313        }
314        self.guard
315    }
316
317    pub async fn close_hard(self) -> DecrementSizeGuard<DB> {
318        let _ = self.inner.live.raw.close_hard().await;
319
320        self.guard
321    }
322
323    pub fn metadata(&self) -> PoolConnectionMetadata {
324        // Use a single `now` value for consistency.
325        let now = Instant::now();
326
327        PoolConnectionMetadata {
328            // NOTE: the receiver is the later `Instant` and the arg is the earlier
329            // https://github.com/launchbadge/sqlx/issues/1912
330            age: now.saturating_duration_since(self.created_at),
331            idle_for: now.saturating_duration_since(self.idle_since),
332        }
333    }
334}
335
336impl<DB: Database, C> Deref for Floating<DB, C> {
337    type Target = C;
338
339    fn deref(&self) -> &Self::Target {
340        &self.inner
341    }
342}
343
344impl<DB: Database, C> DerefMut for Floating<DB, C> {
345    fn deref_mut(&mut self) -> &mut Self::Target {
346        &mut self.inner
347    }
348}