sqlx_core_oldapi/pool/
connection.rs1use std::fmt::{self, Debug, Formatter};
2use std::ops::{Deref, DerefMut};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use futures_intrusive::sync::SemaphoreReleaser;
7
8use crate::connection::Connection;
9use crate::database::Database;
10use crate::error::Error;
11
12use super::inner::{DecrementSizeGuard, PoolInner};
13use crate::pool::options::PoolConnectionMetadata;
14use std::future::Future;
15
16pub struct PoolConnection<DB: Database> {
20 live: Option<Live<DB>>,
21 pub(crate) pool: Arc<PoolInner<DB>>,
22}
23
24pub(super) struct Live<DB: Database> {
25 pub(super) raw: DB::Connection,
26 pub(super) created_at: Instant,
27}
28
29pub(super) struct Idle<DB: Database> {
30 pub(super) live: Live<DB>,
31 pub(super) idle_since: Instant,
32}
33
34pub(super) struct Floating<DB: Database, C> {
36 pub(super) inner: C,
37 pub(super) guard: DecrementSizeGuard<DB>,
38}
39
40const EXPECT_MSG: &str = "BUG: inner connection already taken!";
41
42impl<DB: Database> Debug for PoolConnection<DB> {
43 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
44 f.debug_struct("PoolConnection").finish()
46 }
47}
48
49impl<DB: Database> Deref for PoolConnection<DB> {
50 type Target = DB::Connection;
51
52 fn deref(&self) -> &Self::Target {
53 &self.live.as_ref().expect(EXPECT_MSG).raw
54 }
55}
56
57impl<DB: Database> DerefMut for PoolConnection<DB> {
58 fn deref_mut(&mut self) -> &mut Self::Target {
59 &mut self.live.as_mut().expect(EXPECT_MSG).raw
60 }
61}
62
63impl<DB: Database> AsRef<DB::Connection> for PoolConnection<DB> {
64 fn as_ref(&self) -> &DB::Connection {
65 self
66 }
67}
68
69impl<DB: Database> AsMut<DB::Connection> for PoolConnection<DB> {
70 fn as_mut(&mut self) -> &mut DB::Connection {
71 self
72 }
73}
74
75impl<DB: Database> PoolConnection<DB> {
76 pub fn detach(mut self) -> DB::Connection {
89 self.take_live().float(self.pool.clone()).detach()
90 }
91
92 pub fn leak(mut self) -> DB::Connection {
98 self.take_live().raw
99 }
100
101 fn take_live(&mut self) -> Live<DB> {
102 self.live.take().expect(EXPECT_MSG)
103 }
104
105 pub(crate) fn return_to_pool(&mut self) -> impl Future<Output = ()> + Send + 'static {
109 let floating: Option<Floating<DB, Live<DB>>> =
114 self.live.take().map(|live| live.float(self.pool.clone()));
115
116 let pool = self.pool.clone();
117
118 async move {
119 let returned_to_pool = if let Some(floating) = floating {
120 floating.return_to_pool().await
121 } else {
122 false
123 };
124
125 if !returned_to_pool {
126 pool.min_connections_maintenance(None).await;
127 }
128 }
129 }
130}
131
132impl<DB: Database> Drop for PoolConnection<DB> {
134 fn drop(&mut self) {
135 if self.live.is_some() || self.pool.options.min_connections > 0 {
137 if let Ok(handle) = sqlx_rt::Handle::try_current() {
138 handle.spawn(self.return_to_pool());
139 }
140 }
141 }
142}
143
144impl<DB: Database> Live<DB> {
145 pub fn float(self, pool: Arc<PoolInner<DB>>) -> Floating<DB, Self> {
146 Floating {
147 inner: self,
148 guard: DecrementSizeGuard::new_permit(pool),
150 }
151 }
152
153 pub fn into_idle(self) -> Idle<DB> {
154 Idle {
155 live: self,
156 idle_since: Instant::now(),
157 }
158 }
159}
160
161impl<DB: Database> Deref for Idle<DB> {
162 type Target = Live<DB>;
163
164 fn deref(&self) -> &Self::Target {
165 &self.live
166 }
167}
168
169impl<DB: Database> DerefMut for Idle<DB> {
170 fn deref_mut(&mut self) -> &mut Self::Target {
171 &mut self.live
172 }
173}
174
175impl<DB: Database> Floating<DB, Live<DB>> {
176 pub fn new_live(conn: DB::Connection, guard: DecrementSizeGuard<DB>) -> Self {
177 Self {
178 inner: Live {
179 raw: conn,
180 created_at: Instant::now(),
181 },
182 guard,
183 }
184 }
185
186 pub fn reattach(self) -> PoolConnection<DB> {
187 let Floating { inner, guard } = self;
188
189 let pool = Arc::clone(&guard.pool);
190
191 guard.cancel();
192 PoolConnection {
193 live: Some(inner),
194 pool,
195 }
196 }
197
198 pub fn release(self) {
199 self.guard.pool.clone().release(self);
200 }
201
202 async fn return_to_pool(mut self) -> bool {
206 if self.guard.pool.is_closed() {
208 self.close().await;
209 return false;
210 }
211
212 if let Some(test) = &self.guard.pool.options.after_release {
213 let meta = self.metadata();
214 match (test)(&mut self.inner.raw, meta).await {
215 Ok(true) => (),
216 Ok(false) => {
217 self.close().await;
218 return false;
219 }
220 Err(e) => {
221 log::warn!("error from after_release: {}", e);
222 self.close_hard().await;
225 return false;
226 }
227 }
228 }
229
230 if let Err(e) = self.raw.ping().await {
238 log::warn!(
239 "error occurred while testing the connection on-release: {}",
240 e
241 );
242
243 self.close_hard().await;
245 false
246 } else {
247 self.release();
249 true
250 }
251 }
252
253 pub async fn close(self) {
254 let _ = self.inner.raw.close().await;
256
257 }
259
260 pub async fn close_hard(self) {
261 let _ = self.inner.raw.close_hard().await;
262 }
263
264 pub fn detach(self) -> DB::Connection {
265 self.inner.raw
266 }
267
268 pub fn into_idle(self) -> Floating<DB, Idle<DB>> {
269 Floating {
270 inner: self.inner.into_idle(),
271 guard: self.guard,
272 }
273 }
274
275 pub fn metadata(&self) -> PoolConnectionMetadata {
276 PoolConnectionMetadata {
277 age: self.created_at.elapsed(),
278 idle_for: Duration::ZERO,
279 }
280 }
281}
282
283impl<DB: Database> Floating<DB, Idle<DB>> {
284 pub fn from_idle(
285 idle: Idle<DB>,
286 pool: Arc<PoolInner<DB>>,
287 permit: SemaphoreReleaser<'_>,
288 ) -> Self {
289 Self {
290 inner: idle,
291 guard: DecrementSizeGuard::from_permit(pool, permit),
292 }
293 }
294
295 pub async fn ping(&mut self) -> Result<(), Error> {
296 self.live.raw.ping().await
297 }
298
299 pub fn into_live(self) -> Floating<DB, Live<DB>> {
300 Floating {
301 inner: self.inner.live,
302 guard: self.guard,
303 }
304 }
305
306 pub async fn close(self) -> DecrementSizeGuard<DB> {
307 if let Err(e) = self.inner.live.raw.close().await {
308 log::debug!("error occurred while closing the pool connection: {}", e);
309 }
310 self.guard
311 }
312
313 pub async fn close_hard(self) -> DecrementSizeGuard<DB> {
314 let _ = self.inner.live.raw.close_hard().await;
315
316 self.guard
317 }
318
319 pub fn metadata(&self) -> PoolConnectionMetadata {
320 let now = Instant::now();
322
323 PoolConnectionMetadata {
324 age: now.saturating_duration_since(self.created_at),
327 idle_for: now.saturating_duration_since(self.idle_since),
328 }
329 }
330}
331
332impl<DB: Database, C> Deref for Floating<DB, C> {
333 type Target = C;
334
335 fn deref(&self) -> &Self::Target {
336 &self.inner
337 }
338}
339
340impl<DB: Database, C> DerefMut for Floating<DB, C> {
341 fn deref_mut(&mut self) -> &mut Self::Target {
342 &mut self.inner
343 }
344}