sqlx_core_oldapi/pool/
connection.rs1use std::fmt::{self, Debug, Formatter};
2use std::ops::{Deref, DerefMut};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use futures_intrusive::sync::SemaphoreReleaser;
7
8use crate::connection::Connection;
9use crate::database::Database;
10use crate::error::Error;
11
12use super::inner::{DecrementSizeGuard, PoolInner};
13use crate::pool::options::PoolConnectionMetadata;
14use std::future::Future;
15
16pub struct PoolConnection<DB: Database> {
20 live: Option<Live<DB>>,
21 pub(crate) pool: Arc<PoolInner<DB>>,
22}
23
24pub(super) struct Live<DB: Database> {
25 pub(super) raw: DB::Connection,
26 pub(super) created_at: Instant,
27}
28
29pub(super) struct Idle<DB: Database> {
30 pub(super) live: Live<DB>,
31 pub(super) idle_since: Instant,
32}
33
34pub(super) struct Floating<DB: Database, C> {
36 pub(super) inner: C,
37 pub(super) guard: DecrementSizeGuard<DB>,
38}
39
40const EXPECT_MSG: &str = "BUG: inner connection already taken!";
41
42impl<DB: Database> Debug for PoolConnection<DB> {
43 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
44 f.debug_struct("PoolConnection").finish()
46 }
47}
48
49impl<DB: Database> Deref for PoolConnection<DB> {
50 type Target = DB::Connection;
51
52 fn deref(&self) -> &Self::Target {
53 &self.live.as_ref().expect(EXPECT_MSG).raw
54 }
55}
56
57impl<DB: Database> DerefMut for PoolConnection<DB> {
58 fn deref_mut(&mut self) -> &mut Self::Target {
59 &mut self.live.as_mut().expect(EXPECT_MSG).raw
60 }
61}
62
63impl<DB: Database> AsRef<DB::Connection> for PoolConnection<DB> {
64 fn as_ref(&self) -> &DB::Connection {
65 self
66 }
67}
68
69impl<DB: Database> AsMut<DB::Connection> for PoolConnection<DB> {
70 fn as_mut(&mut self) -> &mut DB::Connection {
71 self
72 }
73}
74
75impl<DB: Database> PoolConnection<DB> {
76 pub fn detach(mut self) -> DB::Connection {
89 self.take_live().float(self.pool.clone()).detach()
90 }
91
92 pub fn leak(mut self) -> DB::Connection {
98 self.take_live().raw
99 }
100
101 fn take_live(&mut self) -> Live<DB> {
102 self.live.take().expect(EXPECT_MSG)
103 }
104
105 pub(crate) fn return_to_pool(&mut self) -> impl Future<Output = ()> + Send + 'static {
109 let floating: Option<Floating<DB, Live<DB>>> =
114 self.live.take().map(|live| live.float(self.pool.clone()));
115
116 let pool = self.pool.clone();
117
118 async move {
119 let returned_to_pool = if let Some(floating) = floating {
120 floating.return_to_pool().await
121 } else {
122 false
123 };
124
125 if !returned_to_pool {
126 pool.min_connections_maintenance(None).await;
127 }
128 }
129 }
130}
131
132impl<DB: Database> Drop for PoolConnection<DB> {
134 fn drop(&mut self) {
135 if self.live.is_some() || self.pool.options.min_connections > 0 {
137 #[cfg(not(feature = "_rt-async-std"))]
138 if let Ok(handle) = sqlx_rt::Handle::try_current() {
139 handle.spawn(self.return_to_pool());
140 }
141
142 #[cfg(feature = "_rt-async-std")]
143 sqlx_rt::spawn(self.return_to_pool());
144 }
145 }
146}
147
148impl<DB: Database> Live<DB> {
149 pub fn float(self, pool: Arc<PoolInner<DB>>) -> Floating<DB, Self> {
150 Floating {
151 inner: self,
152 guard: DecrementSizeGuard::new_permit(pool),
154 }
155 }
156
157 pub fn into_idle(self) -> Idle<DB> {
158 Idle {
159 live: self,
160 idle_since: Instant::now(),
161 }
162 }
163}
164
165impl<DB: Database> Deref for Idle<DB> {
166 type Target = Live<DB>;
167
168 fn deref(&self) -> &Self::Target {
169 &self.live
170 }
171}
172
173impl<DB: Database> DerefMut for Idle<DB> {
174 fn deref_mut(&mut self) -> &mut Self::Target {
175 &mut self.live
176 }
177}
178
179impl<DB: Database> Floating<DB, Live<DB>> {
180 pub fn new_live(conn: DB::Connection, guard: DecrementSizeGuard<DB>) -> Self {
181 Self {
182 inner: Live {
183 raw: conn,
184 created_at: Instant::now(),
185 },
186 guard,
187 }
188 }
189
190 pub fn reattach(self) -> PoolConnection<DB> {
191 let Floating { inner, guard } = self;
192
193 let pool = Arc::clone(&guard.pool);
194
195 guard.cancel();
196 PoolConnection {
197 live: Some(inner),
198 pool,
199 }
200 }
201
202 pub fn release(self) {
203 self.guard.pool.clone().release(self);
204 }
205
206 async fn return_to_pool(mut self) -> bool {
210 if self.guard.pool.is_closed() {
212 self.close().await;
213 return false;
214 }
215
216 if let Some(test) = &self.guard.pool.options.after_release {
217 let meta = self.metadata();
218 match (test)(&mut self.inner.raw, meta).await {
219 Ok(true) => (),
220 Ok(false) => {
221 self.close().await;
222 return false;
223 }
224 Err(e) => {
225 log::warn!("error from after_release: {}", e);
226 self.close_hard().await;
229 return false;
230 }
231 }
232 }
233
234 if let Err(e) = self.raw.ping().await {
242 log::warn!(
243 "error occurred while testing the connection on-release: {}",
244 e
245 );
246
247 self.close_hard().await;
249 false
250 } else {
251 self.release();
253 true
254 }
255 }
256
257 pub async fn close(self) {
258 let _ = self.inner.raw.close().await;
260
261 }
263
264 pub async fn close_hard(self) {
265 let _ = self.inner.raw.close_hard().await;
266 }
267
268 pub fn detach(self) -> DB::Connection {
269 self.inner.raw
270 }
271
272 pub fn into_idle(self) -> Floating<DB, Idle<DB>> {
273 Floating {
274 inner: self.inner.into_idle(),
275 guard: self.guard,
276 }
277 }
278
279 pub fn metadata(&self) -> PoolConnectionMetadata {
280 PoolConnectionMetadata {
281 age: self.created_at.elapsed(),
282 idle_for: Duration::ZERO,
283 }
284 }
285}
286
287impl<DB: Database> Floating<DB, Idle<DB>> {
288 pub fn from_idle(
289 idle: Idle<DB>,
290 pool: Arc<PoolInner<DB>>,
291 permit: SemaphoreReleaser<'_>,
292 ) -> Self {
293 Self {
294 inner: idle,
295 guard: DecrementSizeGuard::from_permit(pool, permit),
296 }
297 }
298
299 pub async fn ping(&mut self) -> Result<(), Error> {
300 self.live.raw.ping().await
301 }
302
303 pub fn into_live(self) -> Floating<DB, Live<DB>> {
304 Floating {
305 inner: self.inner.live,
306 guard: self.guard,
307 }
308 }
309
310 pub async fn close(self) -> DecrementSizeGuard<DB> {
311 if let Err(e) = self.inner.live.raw.close().await {
312 log::debug!("error occurred while closing the pool connection: {}", e);
313 }
314 self.guard
315 }
316
317 pub async fn close_hard(self) -> DecrementSizeGuard<DB> {
318 let _ = self.inner.live.raw.close_hard().await;
319
320 self.guard
321 }
322
323 pub fn metadata(&self) -> PoolConnectionMetadata {
324 let now = Instant::now();
326
327 PoolConnectionMetadata {
328 age: now.saturating_duration_since(self.created_at),
331 idle_for: now.saturating_duration_since(self.idle_since),
332 }
333 }
334}
335
336impl<DB: Database, C> Deref for Floating<DB, C> {
337 type Target = C;
338
339 fn deref(&self) -> &Self::Target {
340 &self.inner
341 }
342}
343
344impl<DB: Database, C> DerefMut for Floating<DB, C> {
345 fn deref_mut(&mut self) -> &mut Self::Target {
346 &mut self.inner
347 }
348}