Skip to main content

sqlx_core_oldapi/pool/
connection.rs

1use std::fmt::{self, Debug, Formatter};
2use std::ops::{Deref, DerefMut};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use futures_intrusive::sync::SemaphoreReleaser;
7
8use crate::connection::Connection;
9use crate::database::Database;
10use crate::error::Error;
11
12use super::inner::{DecrementSizeGuard, PoolInner};
13use crate::pool::options::PoolConnectionMetadata;
14use std::future::Future;
15
16/// A connection managed by a [`Pool`][crate::pool::Pool].
17///
18/// Will be returned to the pool on-drop.
19pub struct PoolConnection<DB: Database> {
20    live: Option<Live<DB>>,
21    pub(crate) pool: Arc<PoolInner<DB>>,
22}
23
24pub(super) struct Live<DB: Database> {
25    pub(super) raw: DB::Connection,
26    pub(super) created_at: Instant,
27}
28
29pub(super) struct Idle<DB: Database> {
30    pub(super) live: Live<DB>,
31    pub(super) idle_since: Instant,
32}
33
34/// RAII wrapper for connections being handled by functions that may drop them
35pub(super) struct Floating<DB: Database, C> {
36    pub(super) inner: C,
37    pub(super) guard: DecrementSizeGuard<DB>,
38}
39
40const EXPECT_MSG: &str = "BUG: inner connection already taken!";
41
42impl<DB: Database> Debug for PoolConnection<DB> {
43    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
44        // TODO: Show the type name of the connection ?
45        f.debug_struct("PoolConnection").finish()
46    }
47}
48
49impl<DB: Database> Deref for PoolConnection<DB> {
50    type Target = DB::Connection;
51
52    fn deref(&self) -> &Self::Target {
53        &self.live.as_ref().expect(EXPECT_MSG).raw
54    }
55}
56
57impl<DB: Database> DerefMut for PoolConnection<DB> {
58    fn deref_mut(&mut self) -> &mut Self::Target {
59        &mut self.live.as_mut().expect(EXPECT_MSG).raw
60    }
61}
62
63impl<DB: Database> AsRef<DB::Connection> for PoolConnection<DB> {
64    fn as_ref(&self) -> &DB::Connection {
65        self
66    }
67}
68
69impl<DB: Database> AsMut<DB::Connection> for PoolConnection<DB> {
70    fn as_mut(&mut self) -> &mut DB::Connection {
71        self
72    }
73}
74
75impl<DB: Database> PoolConnection<DB> {
76    /// Detach this connection from the pool, allowing it to open a replacement.
77    ///
78    /// Note that if your application uses a single shared pool, this
79    /// effectively lets the application exceed the [`max_connections`] setting.
80    ///
81    /// If [`min_connections`] is nonzero, a task will be spawned to replace this connection.
82    ///
83    /// If you want the pool to treat this connection as permanently checked-out,
84    /// use [`.leak()`][Self::leak] instead.
85    ///
86    /// [`max_connections`]: crate::pool::PoolOptions::max_connections
87    /// [`min_connections`]: crate::pool::PoolOptions::min_connections
88    pub fn detach(mut self) -> DB::Connection {
89        self.take_live().float(self.pool.clone()).detach()
90    }
91
92    /// Detach this connection from the pool, treating it as permanently checked-out.
93    ///
94    /// This effectively will reduce the maximum capacity of the pool by 1 every time it is used.
95    ///
96    /// If you don't want to impact the pool's capacity, use [`.detach()`][Self::detach] instead.
97    pub fn leak(mut self) -> DB::Connection {
98        self.take_live().raw
99    }
100
101    fn take_live(&mut self) -> Live<DB> {
102        self.live.take().expect(EXPECT_MSG)
103    }
104
105    /// Test the connection to make sure it is still live before returning it to the pool.
106    ///
107    /// This effectively runs the drop handler eagerly instead of spawning a task to do it.
108    pub(crate) fn return_to_pool(&mut self) -> impl Future<Output = ()> + Send + 'static {
109        // float the connection in the pool before we move into the task
110        // in case the returned `Future` isn't executed, like if it's spawned into a dying runtime
111        // https://github.com/launchbadge/sqlx/issues/1396
112        // Type hints seem to be broken by `Option` combinators in IntelliJ Rust right now (6/22).
113        let floating: Option<Floating<DB, Live<DB>>> =
114            self.live.take().map(|live| live.float(self.pool.clone()));
115
116        let pool = self.pool.clone();
117
118        async move {
119            let returned_to_pool = if let Some(floating) = floating {
120                floating.return_to_pool().await
121            } else {
122                false
123            };
124
125            if !returned_to_pool {
126                pool.min_connections_maintenance(None).await;
127            }
128        }
129    }
130}
131
132/// Returns the connection to the [`Pool`][crate::pool::Pool] it was checked-out from.
133impl<DB: Database> Drop for PoolConnection<DB> {
134    fn drop(&mut self) {
135        // We still need to spawn a task to maintain `min_connections`.
136        if self.live.is_some() || self.pool.options.min_connections > 0 {
137            if let Ok(handle) = sqlx_rt::Handle::try_current() {
138                handle.spawn(self.return_to_pool());
139            }
140        }
141    }
142}
143
144impl<DB: Database> Live<DB> {
145    pub fn float(self, pool: Arc<PoolInner<DB>>) -> Floating<DB, Self> {
146        Floating {
147            inner: self,
148            // create a new guard from a previously leaked permit
149            guard: DecrementSizeGuard::new_permit(pool),
150        }
151    }
152
153    pub fn into_idle(self) -> Idle<DB> {
154        Idle {
155            live: self,
156            idle_since: Instant::now(),
157        }
158    }
159}
160
161impl<DB: Database> Deref for Idle<DB> {
162    type Target = Live<DB>;
163
164    fn deref(&self) -> &Self::Target {
165        &self.live
166    }
167}
168
169impl<DB: Database> DerefMut for Idle<DB> {
170    fn deref_mut(&mut self) -> &mut Self::Target {
171        &mut self.live
172    }
173}
174
175impl<DB: Database> Floating<DB, Live<DB>> {
176    pub fn new_live(conn: DB::Connection, guard: DecrementSizeGuard<DB>) -> Self {
177        Self {
178            inner: Live {
179                raw: conn,
180                created_at: Instant::now(),
181            },
182            guard,
183        }
184    }
185
186    pub fn reattach(self) -> PoolConnection<DB> {
187        let Floating { inner, guard } = self;
188
189        let pool = Arc::clone(&guard.pool);
190
191        guard.cancel();
192        PoolConnection {
193            live: Some(inner),
194            pool,
195        }
196    }
197
198    pub fn release(self) {
199        self.guard.pool.clone().release(self);
200    }
201
202    /// Return the connection to the pool.
203    ///
204    /// Returns `true` if the connection was successfully returned, `false` if it was closed.
205    async fn return_to_pool(mut self) -> bool {
206        // Immediately close the connection.
207        if self.guard.pool.is_closed() {
208            self.close().await;
209            return false;
210        }
211
212        if let Some(test) = &self.guard.pool.options.after_release {
213            let meta = self.metadata();
214            match (test)(&mut self.inner.raw, meta).await {
215                Ok(true) => (),
216                Ok(false) => {
217                    self.close().await;
218                    return false;
219                }
220                Err(e) => {
221                    log::warn!("error from after_release: {}", e);
222                    // Connection is broken, don't try to gracefully close as
223                    // something weird might happen.
224                    self.close_hard().await;
225                    return false;
226                }
227            }
228        }
229
230        // test the connection on-release to ensure it is still viable,
231        // and flush anything time-sensitive like transaction rollbacks
232        // if an Executor future/stream is dropped during an `.await` call, the connection
233        // is likely to be left in an inconsistent state, in which case it should not be
234        // returned to the pool; also of course, if it was dropped due to an error
235        // this is simply a band-aid as SQLx-next connections should be able
236        // to recover from cancellations
237        if let Err(e) = self.raw.ping().await {
238            log::warn!(
239                "error occurred while testing the connection on-release: {}",
240                e
241            );
242
243            // Connection is broken, don't try to gracefully close.
244            self.close_hard().await;
245            false
246        } else {
247            // if the connection is still viable, release it to the pool
248            self.release();
249            true
250        }
251    }
252
253    pub async fn close(self) {
254        // This isn't used anywhere that we care about the return value
255        let _ = self.inner.raw.close().await;
256
257        // `guard` is dropped as intended
258    }
259
260    pub async fn close_hard(self) {
261        let _ = self.inner.raw.close_hard().await;
262    }
263
264    pub fn detach(self) -> DB::Connection {
265        self.inner.raw
266    }
267
268    pub fn into_idle(self) -> Floating<DB, Idle<DB>> {
269        Floating {
270            inner: self.inner.into_idle(),
271            guard: self.guard,
272        }
273    }
274
275    pub fn metadata(&self) -> PoolConnectionMetadata {
276        PoolConnectionMetadata {
277            age: self.created_at.elapsed(),
278            idle_for: Duration::ZERO,
279        }
280    }
281}
282
283impl<DB: Database> Floating<DB, Idle<DB>> {
284    pub fn from_idle(
285        idle: Idle<DB>,
286        pool: Arc<PoolInner<DB>>,
287        permit: SemaphoreReleaser<'_>,
288    ) -> Self {
289        Self {
290            inner: idle,
291            guard: DecrementSizeGuard::from_permit(pool, permit),
292        }
293    }
294
295    pub async fn ping(&mut self) -> Result<(), Error> {
296        self.live.raw.ping().await
297    }
298
299    pub fn into_live(self) -> Floating<DB, Live<DB>> {
300        Floating {
301            inner: self.inner.live,
302            guard: self.guard,
303        }
304    }
305
306    pub async fn close(self) -> DecrementSizeGuard<DB> {
307        if let Err(e) = self.inner.live.raw.close().await {
308            log::debug!("error occurred while closing the pool connection: {}", e);
309        }
310        self.guard
311    }
312
313    pub async fn close_hard(self) -> DecrementSizeGuard<DB> {
314        let _ = self.inner.live.raw.close_hard().await;
315
316        self.guard
317    }
318
319    pub fn metadata(&self) -> PoolConnectionMetadata {
320        // Use a single `now` value for consistency.
321        let now = Instant::now();
322
323        PoolConnectionMetadata {
324            // NOTE: the receiver is the later `Instant` and the arg is the earlier
325            // https://github.com/launchbadge/sqlx/issues/1912
326            age: now.saturating_duration_since(self.created_at),
327            idle_for: now.saturating_duration_since(self.idle_since),
328        }
329    }
330}
331
332impl<DB: Database, C> Deref for Floating<DB, C> {
333    type Target = C;
334
335    fn deref(&self) -> &Self::Target {
336        &self.inner
337    }
338}
339
340impl<DB: Database, C> DerefMut for Floating<DB, C> {
341    fn deref_mut(&mut self) -> &mut Self::Target {
342        &mut self.inner
343    }
344}