sqlx_build_trust_core/pool/
connection.rs

1use std::fmt::{self, Debug, Formatter};
2use std::ops::{Deref, DerefMut};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use crate::sync::AsyncSemaphoreReleaser;
7
8use crate::connection::Connection;
9use crate::database::Database;
10use crate::error::Error;
11
12use super::inner::{is_beyond_max_lifetime, DecrementSizeGuard, PoolInner};
13use crate::pool::options::PoolConnectionMetadata;
14use std::future::Future;
15
16/// A connection managed by a [`Pool`][crate::pool::Pool].
17///
18/// Will be returned to the pool on-drop.
19pub struct PoolConnection<DB: Database> {
20    live: Option<Live<DB>>,
21    pub(crate) pool: Arc<PoolInner<DB>>,
22}
23
24pub(super) struct Live<DB: Database> {
25    pub(super) raw: DB::Connection,
26    pub(super) created_at: Instant,
27}
28
29pub(super) struct Idle<DB: Database> {
30    pub(super) live: Live<DB>,
31    pub(super) idle_since: Instant,
32}
33
34/// RAII wrapper for connections being handled by functions that may drop them
35pub(super) struct Floating<DB: Database, C> {
36    pub(super) inner: C,
37    pub(super) guard: DecrementSizeGuard<DB>,
38}
39
40const EXPECT_MSG: &str = "BUG: inner connection already taken!";
41
42impl<DB: Database> Debug for PoolConnection<DB> {
43    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
44        // TODO: Show the type name of the connection ?
45        f.debug_struct("PoolConnection").finish()
46    }
47}
48
49impl<DB: Database> Deref for PoolConnection<DB> {
50    type Target = DB::Connection;
51
52    fn deref(&self) -> &Self::Target {
53        &self.live.as_ref().expect(EXPECT_MSG).raw
54    }
55}
56
57impl<DB: Database> DerefMut for PoolConnection<DB> {
58    fn deref_mut(&mut self) -> &mut Self::Target {
59        &mut self.live.as_mut().expect(EXPECT_MSG).raw
60    }
61}
62
63impl<DB: Database> AsRef<DB::Connection> for PoolConnection<DB> {
64    fn as_ref(&self) -> &DB::Connection {
65        self
66    }
67}
68
69impl<DB: Database> AsMut<DB::Connection> for PoolConnection<DB> {
70    fn as_mut(&mut self) -> &mut DB::Connection {
71        self
72    }
73}
74
75impl<DB: Database> PoolConnection<DB> {
76    /// Close this connection, allowing the pool to open a replacement.
77    ///
78    /// Equivalent to calling [`.detach()`] then [`.close()`], but the connection permit is retained
79    /// for the duration so that the pool may not exceed `max_connections`.
80    ///
81    /// [`.detach()`]: PoolConnection::detach
82    /// [`.close()`]: Connection::close
83    pub async fn close(mut self) -> Result<(), Error> {
84        let floating = self.take_live().float(self.pool.clone());
85        floating.inner.raw.close().await
86    }
87
88    /// Detach this connection from the pool, allowing it to open a replacement.
89    ///
90    /// Note that if your application uses a single shared pool, this
91    /// effectively lets the application exceed the [`max_connections`] setting.
92    ///
93    /// If [`min_connections`] is nonzero, a task will be spawned to replace this connection.
94    ///
95    /// If you want the pool to treat this connection as permanently checked-out,
96    /// use [`.leak()`][Self::leak] instead.
97    ///
98    /// [`max_connections`]: crate::pool::PoolOptions::max_connections
99    /// [`min_connections`]: crate::pool::PoolOptions::min_connections
100    pub fn detach(mut self) -> DB::Connection {
101        self.take_live().float(self.pool.clone()).detach()
102    }
103
104    /// Detach this connection from the pool, treating it as permanently checked-out.
105    ///
106    /// This effectively will reduce the maximum capacity of the pool by 1 every time it is used.
107    ///
108    /// If you don't want to impact the pool's capacity, use [`.detach()`][Self::detach] instead.
109    pub fn leak(mut self) -> DB::Connection {
110        self.take_live().raw
111    }
112
113    fn take_live(&mut self) -> Live<DB> {
114        self.live.take().expect(EXPECT_MSG)
115    }
116
117    /// Test the connection to make sure it is still live before returning it to the pool.
118    ///
119    /// This effectively runs the drop handler eagerly instead of spawning a task to do it.
120    #[doc(hidden)]
121    pub fn return_to_pool(&mut self) -> impl Future<Output = ()> + Send + 'static {
122        // float the connection in the pool before we move into the task
123        // in case the returned `Future` isn't executed, like if it's spawned into a dying runtime
124        // https://github.com/launchbadge/sqlx/issues/1396
125        // Type hints seem to be broken by `Option` combinators in IntelliJ Rust right now (6/22).
126        let floating: Option<Floating<DB, Live<DB>>> =
127            self.live.take().map(|live| live.float(self.pool.clone()));
128
129        let pool = self.pool.clone();
130
131        async move {
132            let returned_to_pool = if let Some(floating) = floating {
133                floating.return_to_pool().await
134            } else {
135                false
136            };
137
138            if !returned_to_pool {
139                pool.min_connections_maintenance(None).await;
140            }
141        }
142    }
143}
144
145impl<'c, DB: Database> crate::acquire::Acquire<'c> for &'c mut PoolConnection<DB> {
146    type Database = DB;
147
148    type Connection = &'c mut <DB as Database>::Connection;
149
150    #[inline]
151    fn acquire(self) -> futures_core::future::BoxFuture<'c, Result<Self::Connection, Error>> {
152        Box::pin(futures_util::future::ok(&mut **self))
153    }
154
155    #[inline]
156    fn begin(
157        self,
158    ) -> futures_core::future::BoxFuture<'c, Result<crate::transaction::Transaction<'c, DB>, Error>>
159    {
160        crate::transaction::Transaction::begin(&mut **self)
161    }
162}
163
164/// Returns the connection to the [`Pool`][crate::pool::Pool] it was checked-out from.
165impl<DB: Database> Drop for PoolConnection<DB> {
166    fn drop(&mut self) {
167        // We still need to spawn a task to maintain `min_connections`.
168        if self.live.is_some() || self.pool.options.min_connections > 0 {
169            crate::rt::spawn(self.return_to_pool());
170        }
171    }
172}
173
174impl<DB: Database> Live<DB> {
175    pub fn float(self, pool: Arc<PoolInner<DB>>) -> Floating<DB, Self> {
176        Floating {
177            inner: self,
178            // create a new guard from a previously leaked permit
179            guard: DecrementSizeGuard::new_permit(pool),
180        }
181    }
182
183    pub fn into_idle(self) -> Idle<DB> {
184        Idle {
185            live: self,
186            idle_since: Instant::now(),
187        }
188    }
189}
190
191impl<DB: Database> Deref for Idle<DB> {
192    type Target = Live<DB>;
193
194    fn deref(&self) -> &Self::Target {
195        &self.live
196    }
197}
198
199impl<DB: Database> DerefMut for Idle<DB> {
200    fn deref_mut(&mut self) -> &mut Self::Target {
201        &mut self.live
202    }
203}
204
205impl<DB: Database> Floating<DB, Live<DB>> {
206    pub fn new_live(conn: DB::Connection, guard: DecrementSizeGuard<DB>) -> Self {
207        Self {
208            inner: Live {
209                raw: conn,
210                created_at: Instant::now(),
211            },
212            guard,
213        }
214    }
215
216    pub fn reattach(self) -> PoolConnection<DB> {
217        let Floating { inner, guard } = self;
218
219        let pool = Arc::clone(&guard.pool);
220
221        guard.cancel();
222        PoolConnection {
223            live: Some(inner),
224            pool,
225        }
226    }
227
228    pub fn release(self) {
229        self.guard.pool.clone().release(self);
230    }
231
232    /// Return the connection to the pool.
233    ///
234    /// Returns `true` if the connection was successfully returned, `false` if it was closed.
235    async fn return_to_pool(mut self) -> bool {
236        // Immediately close the connection.
237        if self.guard.pool.is_closed() {
238            self.close().await;
239            return false;
240        }
241
242        // If the connection is beyond max lifetime, close the connection and
243        // immediately create a new connection
244        if is_beyond_max_lifetime(&self.inner, &self.guard.pool.options) {
245            self.close().await;
246            return false;
247        }
248
249        if let Some(test) = &self.guard.pool.options.after_release {
250            let meta = self.metadata();
251            match (test)(&mut self.inner.raw, meta).await {
252                Ok(true) => (),
253                Ok(false) => {
254                    self.close().await;
255                    return false;
256                }
257                Err(error) => {
258                    tracing::warn!(%error, "error from `after_release`");
259                    // Connection is broken, don't try to gracefully close as
260                    // something weird might happen.
261                    self.close_hard().await;
262                    return false;
263                }
264            }
265        }
266
267        // test the connection on-release to ensure it is still viable,
268        // and flush anything time-sensitive like transaction rollbacks
269        // if an Executor future/stream is dropped during an `.await` call, the connection
270        // is likely to be left in an inconsistent state, in which case it should not be
271        // returned to the pool; also of course, if it was dropped due to an error
272        // this is simply a band-aid as SQLx-next connections should be able
273        // to recover from cancellations
274        if let Err(error) = self.raw.ping().await {
275            tracing::warn!(
276                %error,
277                "error occurred while testing the connection on-release",
278            );
279
280            // Connection is broken, don't try to gracefully close.
281            self.close_hard().await;
282            false
283        } else {
284            // if the connection is still viable, release it to the pool
285            self.release();
286            true
287        }
288    }
289
290    pub async fn close(self) {
291        // This isn't used anywhere that we care about the return value
292        let _ = self.inner.raw.close().await;
293
294        // `guard` is dropped as intended
295    }
296
297    pub async fn close_hard(self) {
298        let _ = self.inner.raw.close_hard().await;
299    }
300
301    pub fn detach(self) -> DB::Connection {
302        self.inner.raw
303    }
304
305    pub fn into_idle(self) -> Floating<DB, Idle<DB>> {
306        Floating {
307            inner: self.inner.into_idle(),
308            guard: self.guard,
309        }
310    }
311
312    pub fn metadata(&self) -> PoolConnectionMetadata {
313        PoolConnectionMetadata {
314            age: self.created_at.elapsed(),
315            idle_for: Duration::ZERO,
316        }
317    }
318}
319
320impl<DB: Database> Floating<DB, Idle<DB>> {
321    pub fn from_idle(
322        idle: Idle<DB>,
323        pool: Arc<PoolInner<DB>>,
324        permit: AsyncSemaphoreReleaser<'_>,
325    ) -> Self {
326        Self {
327            inner: idle,
328            guard: DecrementSizeGuard::from_permit(pool, permit),
329        }
330    }
331
332    pub async fn ping(&mut self) -> Result<(), Error> {
333        self.live.raw.ping().await
334    }
335
336    pub fn into_live(self) -> Floating<DB, Live<DB>> {
337        Floating {
338            inner: self.inner.live,
339            guard: self.guard,
340        }
341    }
342
343    pub async fn close(self) -> DecrementSizeGuard<DB> {
344        if let Err(error) = self.inner.live.raw.close().await {
345            tracing::debug!(%error, "error occurred while closing the pool connection");
346        }
347        self.guard
348    }
349
350    pub async fn close_hard(self) -> DecrementSizeGuard<DB> {
351        let _ = self.inner.live.raw.close_hard().await;
352
353        self.guard
354    }
355
356    pub fn metadata(&self) -> PoolConnectionMetadata {
357        // Use a single `now` value for consistency.
358        let now = Instant::now();
359
360        PoolConnectionMetadata {
361            // NOTE: the receiver is the later `Instant` and the arg is the earlier
362            // https://github.com/launchbadge/sqlx/issues/1912
363            age: now.saturating_duration_since(self.created_at),
364            idle_for: now.saturating_duration_since(self.idle_since),
365        }
366    }
367}
368
369impl<DB: Database, C> Deref for Floating<DB, C> {
370    type Target = C;
371
372    fn deref(&self) -> &Self::Target {
373        &self.inner
374    }
375}
376
377impl<DB: Database, C> DerefMut for Floating<DB, C> {
378    fn deref_mut(&mut self) -> &mut Self::Target {
379        &mut self.inner
380    }
381}