sqlx_build_trust_core/pool/
connection.rs1use std::fmt::{self, Debug, Formatter};
2use std::ops::{Deref, DerefMut};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use crate::sync::AsyncSemaphoreReleaser;
7
8use crate::connection::Connection;
9use crate::database::Database;
10use crate::error::Error;
11
12use super::inner::{is_beyond_max_lifetime, DecrementSizeGuard, PoolInner};
13use crate::pool::options::PoolConnectionMetadata;
14use std::future::Future;
15
16pub struct PoolConnection<DB: Database> {
20 live: Option<Live<DB>>,
21 pub(crate) pool: Arc<PoolInner<DB>>,
22}
23
24pub(super) struct Live<DB: Database> {
25 pub(super) raw: DB::Connection,
26 pub(super) created_at: Instant,
27}
28
29pub(super) struct Idle<DB: Database> {
30 pub(super) live: Live<DB>,
31 pub(super) idle_since: Instant,
32}
33
34pub(super) struct Floating<DB: Database, C> {
36 pub(super) inner: C,
37 pub(super) guard: DecrementSizeGuard<DB>,
38}
39
40const EXPECT_MSG: &str = "BUG: inner connection already taken!";
41
42impl<DB: Database> Debug for PoolConnection<DB> {
43 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
44 f.debug_struct("PoolConnection").finish()
46 }
47}
48
49impl<DB: Database> Deref for PoolConnection<DB> {
50 type Target = DB::Connection;
51
52 fn deref(&self) -> &Self::Target {
53 &self.live.as_ref().expect(EXPECT_MSG).raw
54 }
55}
56
57impl<DB: Database> DerefMut for PoolConnection<DB> {
58 fn deref_mut(&mut self) -> &mut Self::Target {
59 &mut self.live.as_mut().expect(EXPECT_MSG).raw
60 }
61}
62
63impl<DB: Database> AsRef<DB::Connection> for PoolConnection<DB> {
64 fn as_ref(&self) -> &DB::Connection {
65 self
66 }
67}
68
69impl<DB: Database> AsMut<DB::Connection> for PoolConnection<DB> {
70 fn as_mut(&mut self) -> &mut DB::Connection {
71 self
72 }
73}
74
75impl<DB: Database> PoolConnection<DB> {
76 pub async fn close(mut self) -> Result<(), Error> {
84 let floating = self.take_live().float(self.pool.clone());
85 floating.inner.raw.close().await
86 }
87
88 pub fn detach(mut self) -> DB::Connection {
101 self.take_live().float(self.pool.clone()).detach()
102 }
103
104 pub fn leak(mut self) -> DB::Connection {
110 self.take_live().raw
111 }
112
113 fn take_live(&mut self) -> Live<DB> {
114 self.live.take().expect(EXPECT_MSG)
115 }
116
117 #[doc(hidden)]
121 pub fn return_to_pool(&mut self) -> impl Future<Output = ()> + Send + 'static {
122 let floating: Option<Floating<DB, Live<DB>>> =
127 self.live.take().map(|live| live.float(self.pool.clone()));
128
129 let pool = self.pool.clone();
130
131 async move {
132 let returned_to_pool = if let Some(floating) = floating {
133 floating.return_to_pool().await
134 } else {
135 false
136 };
137
138 if !returned_to_pool {
139 pool.min_connections_maintenance(None).await;
140 }
141 }
142 }
143}
144
145impl<'c, DB: Database> crate::acquire::Acquire<'c> for &'c mut PoolConnection<DB> {
146 type Database = DB;
147
148 type Connection = &'c mut <DB as Database>::Connection;
149
150 #[inline]
151 fn acquire(self) -> futures_core::future::BoxFuture<'c, Result<Self::Connection, Error>> {
152 Box::pin(futures_util::future::ok(&mut **self))
153 }
154
155 #[inline]
156 fn begin(
157 self,
158 ) -> futures_core::future::BoxFuture<'c, Result<crate::transaction::Transaction<'c, DB>, Error>>
159 {
160 crate::transaction::Transaction::begin(&mut **self)
161 }
162}
163
164impl<DB: Database> Drop for PoolConnection<DB> {
166 fn drop(&mut self) {
167 if self.live.is_some() || self.pool.options.min_connections > 0 {
169 crate::rt::spawn(self.return_to_pool());
170 }
171 }
172}
173
174impl<DB: Database> Live<DB> {
175 pub fn float(self, pool: Arc<PoolInner<DB>>) -> Floating<DB, Self> {
176 Floating {
177 inner: self,
178 guard: DecrementSizeGuard::new_permit(pool),
180 }
181 }
182
183 pub fn into_idle(self) -> Idle<DB> {
184 Idle {
185 live: self,
186 idle_since: Instant::now(),
187 }
188 }
189}
190
191impl<DB: Database> Deref for Idle<DB> {
192 type Target = Live<DB>;
193
194 fn deref(&self) -> &Self::Target {
195 &self.live
196 }
197}
198
199impl<DB: Database> DerefMut for Idle<DB> {
200 fn deref_mut(&mut self) -> &mut Self::Target {
201 &mut self.live
202 }
203}
204
205impl<DB: Database> Floating<DB, Live<DB>> {
206 pub fn new_live(conn: DB::Connection, guard: DecrementSizeGuard<DB>) -> Self {
207 Self {
208 inner: Live {
209 raw: conn,
210 created_at: Instant::now(),
211 },
212 guard,
213 }
214 }
215
216 pub fn reattach(self) -> PoolConnection<DB> {
217 let Floating { inner, guard } = self;
218
219 let pool = Arc::clone(&guard.pool);
220
221 guard.cancel();
222 PoolConnection {
223 live: Some(inner),
224 pool,
225 }
226 }
227
228 pub fn release(self) {
229 self.guard.pool.clone().release(self);
230 }
231
232 async fn return_to_pool(mut self) -> bool {
236 if self.guard.pool.is_closed() {
238 self.close().await;
239 return false;
240 }
241
242 if is_beyond_max_lifetime(&self.inner, &self.guard.pool.options) {
245 self.close().await;
246 return false;
247 }
248
249 if let Some(test) = &self.guard.pool.options.after_release {
250 let meta = self.metadata();
251 match (test)(&mut self.inner.raw, meta).await {
252 Ok(true) => (),
253 Ok(false) => {
254 self.close().await;
255 return false;
256 }
257 Err(error) => {
258 tracing::warn!(%error, "error from `after_release`");
259 self.close_hard().await;
262 return false;
263 }
264 }
265 }
266
267 if let Err(error) = self.raw.ping().await {
275 tracing::warn!(
276 %error,
277 "error occurred while testing the connection on-release",
278 );
279
280 self.close_hard().await;
282 false
283 } else {
284 self.release();
286 true
287 }
288 }
289
290 pub async fn close(self) {
291 let _ = self.inner.raw.close().await;
293
294 }
296
297 pub async fn close_hard(self) {
298 let _ = self.inner.raw.close_hard().await;
299 }
300
301 pub fn detach(self) -> DB::Connection {
302 self.inner.raw
303 }
304
305 pub fn into_idle(self) -> Floating<DB, Idle<DB>> {
306 Floating {
307 inner: self.inner.into_idle(),
308 guard: self.guard,
309 }
310 }
311
312 pub fn metadata(&self) -> PoolConnectionMetadata {
313 PoolConnectionMetadata {
314 age: self.created_at.elapsed(),
315 idle_for: Duration::ZERO,
316 }
317 }
318}
319
320impl<DB: Database> Floating<DB, Idle<DB>> {
321 pub fn from_idle(
322 idle: Idle<DB>,
323 pool: Arc<PoolInner<DB>>,
324 permit: AsyncSemaphoreReleaser<'_>,
325 ) -> Self {
326 Self {
327 inner: idle,
328 guard: DecrementSizeGuard::from_permit(pool, permit),
329 }
330 }
331
332 pub async fn ping(&mut self) -> Result<(), Error> {
333 self.live.raw.ping().await
334 }
335
336 pub fn into_live(self) -> Floating<DB, Live<DB>> {
337 Floating {
338 inner: self.inner.live,
339 guard: self.guard,
340 }
341 }
342
343 pub async fn close(self) -> DecrementSizeGuard<DB> {
344 if let Err(error) = self.inner.live.raw.close().await {
345 tracing::debug!(%error, "error occurred while closing the pool connection");
346 }
347 self.guard
348 }
349
350 pub async fn close_hard(self) -> DecrementSizeGuard<DB> {
351 let _ = self.inner.live.raw.close_hard().await;
352
353 self.guard
354 }
355
356 pub fn metadata(&self) -> PoolConnectionMetadata {
357 let now = Instant::now();
359
360 PoolConnectionMetadata {
361 age: now.saturating_duration_since(self.created_at),
364 idle_for: now.saturating_duration_since(self.idle_since),
365 }
366 }
367}
368
369impl<DB: Database, C> Deref for Floating<DB, C> {
370 type Target = C;
371
372 fn deref(&self) -> &Self::Target {
373 &self.inner
374 }
375}
376
377impl<DB: Database, C> DerefMut for Floating<DB, C> {
378 fn deref_mut(&mut self) -> &mut Self::Target {
379 &mut self.inner
380 }
381}