1use core::fmt;
2use core::future::Future;
3use core::ops::{Deref, DerefMut};
4use core::sync::atomic::{AtomicBool, Ordering};
5#[cfg(not(feature = "no-send"))]
6use std::sync::{Arc as WrapPointer, Weak};
7use std::time::Instant;
8#[cfg(feature = "no-send")]
9use std::{
10 rc::{Rc as WrapPointer, Weak},
11 thread::{self, ThreadId},
12};
13
14use crate::builder::Builder;
15use crate::manager::Manager;
16use crate::pool_inner::{PoolLock, State};
17
18pub struct Conn<M: Manager> {
19 conn: M::Connection,
20 marker: usize,
21 birth: Instant,
22}
23
24impl<M: Manager> Conn<M> {
25 pub(crate) fn marker(&self) -> usize {
26 self.marker
27 }
28}
29
30pub struct IdleConn<M: Manager> {
31 conn: Conn<M>,
32 idle_start: Instant,
33}
34
35impl<M: Manager> IdleConn<M> {
36 fn new(conn: M::Connection, marker: usize) -> Self {
37 let now = Instant::now();
38 IdleConn {
39 conn: Conn {
40 conn,
41 marker,
42 birth: now,
43 },
44 idle_start: now,
45 }
46 }
47
48 #[inline]
49 pub(crate) fn marker(&self) -> usize {
50 self.conn.marker
51 }
52}
53
54impl<M: Manager> From<Conn<M>> for IdleConn<M> {
55 fn from(conn: Conn<M>) -> IdleConn<M> {
56 let now = Instant::now();
57 IdleConn {
58 conn,
59 idle_start: now,
60 }
61 }
62}
63
64impl<M: Manager> From<IdleConn<M>> for Conn<M> {
65 fn from(conn: IdleConn<M>) -> Conn<M> {
66 Conn {
67 conn: conn.conn.conn,
68 birth: conn.conn.birth,
69 marker: conn.conn.marker,
70 }
71 }
72}
73
74pub struct ManagedPool<M: Manager> {
75 builder: Builder,
76 manager: M,
77 running: AtomicBool,
78 pool_lock: PoolLock<M>,
79}
80
81impl<M: Manager> ManagedPool<M> {
82 fn new(builder: Builder, manager: M) -> Self {
83 let pool_lock = PoolLock::from_builder(&builder);
84
85 Self {
86 builder,
87 manager,
88 running: AtomicBool::new(true),
89 pool_lock,
90 }
91 }
92
93 #[inline]
94 async fn get_conn<'a, R>(&'a self, shared_pool: &'a SharedManagedPool<M>) -> Result<R, M::Error>
95 where
96 R: PoolRefBehavior<'a, M> + Unpin,
97 {
98 let fut = self.pool_lock.lock::<R>(shared_pool);
99 let timeout = self.builder.wait_timeout;
100
101 let mut pool_ref = self.manager.timeout(fut, timeout).await?;
102
103 if self.builder.always_check {
104 let mut retry = 0u8;
105 loop {
106 let result = self.check_conn(&mut pool_ref).await;
107 match result {
108 Ok(Ok(_)) => break,
109 Ok(Err(e)) => {
110 pool_ref.take_drop();
111 if retry == 3 {
112 return Err(e);
113 } else {
114 retry += 1;
115 };
116 }
117 Err(timeout_err) => {
118 pool_ref.take_drop();
119 return Err(timeout_err);
120 }
121 }
122
123 let fut = self.pool_lock.lock::<R>(shared_pool);
124
125 pool_ref = self.manager.timeout(fut, timeout).await?;
126 }
127 };
128
129 Ok(pool_ref)
130 }
131
132 fn drop_conn(&self, marker: usize, should_spawn_new: bool) -> Option<usize> {
133 self.pool_lock.dec_spawned(marker, should_spawn_new)
136 }
137
138 pub(crate) async fn add_idle_conn(&self, marker: usize) -> Result<(), M::Error> {
139 let fut = self.manager.connect();
140 let timeout = self.builder.connection_timeout;
141
142 let conn = self
143 .manager
144 .timeout(fut, timeout)
145 .await
146 .map_err(|e| {
147 self.pool_lock.dec_pending(1);
148 e
149 })?
150 .map_err(|e| {
151 self.pool_lock.dec_pending(1);
152 e
153 })?;
154
155 self.pool_lock
156 .put_back_inc_spawned(IdleConn::new(conn, marker));
157
158 Ok(())
159 }
160
161 async fn check_conn(&self, conn: &mut M::Connection) -> Result<Result<(), M::Error>, M::Error> {
162 let fut = self.manager.is_valid(conn);
163
164 let timeout = self.builder.connection_timeout;
165
166 let res = self.manager.timeout(fut, timeout).await?;
167
168 Ok(res)
169 }
170
171 async fn replenish_idle_conn(
172 &self,
173 pending_count: usize,
174 marker: usize,
175 ) -> Result<(), M::Error> {
176 for i in 0..pending_count {
177 self.add_idle_conn(marker).await.map_err(|e| {
178 let count = pending_count - i - 1;
181 if count > 0 {
182 self.pool_lock.dec_pending(count);
183 };
184 e
185 })?;
186 }
187
188 Ok(())
189 }
190
191 fn if_running(&self, running: bool) {
192 self.running.store(running, Ordering::Release);
193 }
194
195 pub(crate) fn is_running(&self) -> bool {
196 self.running.load(Ordering::Acquire)
197 }
198
199 #[cfg(not(feature = "no-send"))]
200 pub(crate) fn spawn<Fut>(&self, fut: Fut)
201 where
202 Fut: Future<Output = ()> + Send + 'static,
203 {
204 self.manager.spawn(fut);
205 }
206
207 #[cfg(feature = "no-send")]
208 pub(crate) fn spawn<Fut>(&self, fut: Fut)
209 where
210 Fut: Future<Output = ()> + 'static,
211 {
212 self.manager.spawn(fut);
213 }
214
215 pub async fn reap_idle_conn(&self) -> Result<(), M::Error> {
216 let now = Instant::now();
217
218 let pending_new = self.pool_lock.try_drop_conn(|conn| {
219 let mut should_drop = false;
220 if let Some(timeout) = self.builder.idle_timeout {
221 should_drop |= now >= conn.idle_start + timeout;
222 }
223 if let Some(lifetime) = self.builder.max_lifetime {
224 should_drop |= now >= conn.conn.birth + lifetime;
225 }
226 should_drop
227 });
228
229 match pending_new {
230 Some((pending_new, marker)) => self.replenish_idle_conn(pending_new, marker).await,
231 None => Ok(()),
232 }
233 }
234
235 pub fn garbage_collect(&self) {
236 self.pool_lock
237 .drop_pending(|pending| pending.should_remove(self.builder.connection_timeout));
238 }
239
240 pub fn get_builder(&self) -> &Builder {
242 &self.builder
243 }
244}
245
246pub type SharedManagedPool<M> = WrapPointer<ManagedPool<M>>;
247
248pub type WeakSharedManagedPool<M> = Weak<ManagedPool<M>>;
249
250pub struct Pool<M: Manager> {
251 pool: SharedManagedPool<M>,
252 #[cfg(feature = "no-send")]
253 id: ThreadId,
254}
255
256impl<M: Manager> Clone for Pool<M> {
257 fn clone(&self) -> Self {
258 Self {
259 pool: self.pool.clone(),
260 #[cfg(feature = "no-send")]
261 id: self.id,
262 }
263 }
264}
265
266impl<M: Manager> fmt::Debug for Pool<M> {
267 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
268 f.write_fmt(format_args!("Pool({:p})", self.pool))
269 }
270}
271
272impl<M: Manager> Drop for Pool<M> {
273 fn drop(&mut self) {
274 self.pool.manager.on_stop();
275 }
276}
277
278impl<M: Manager> Pool<M> {
279 #[must_use = "futures do nothing unless you `.await` or poll them"]
283 pub async fn get(&self) -> Result<PoolRef<'_, M>, M::Error> {
284 let shared_pool = &self.pool;
285
286 shared_pool.get_conn(shared_pool).await
287 }
288
289 #[must_use = "futures do nothing unless you `.await` or poll them"]
293 pub async fn get_owned(&self) -> Result<PoolRefOwned<M>, M::Error> {
294 let shared_pool = &self.pool;
295
296 shared_pool.get_conn(shared_pool).await
297 }
298
299 #[must_use = "futures do nothing unless you `.await` or poll them"]
309 pub async fn run<'a, T, E, F, FF>(&'a self, f: F) -> Result<T, E>
310 where
311 F: FnOnce(PoolRef<'a, M>) -> FF,
312 FF: Future<Output = Result<T, E>> + Send + 'a,
313 E: From<M::Error>,
314 T: Send + 'static,
315 {
316 let pool_ref = self.get().await?;
317 f(pool_ref).await
318 }
319
320 pub fn pause(&self) {
329 self.pool.if_running(false);
330 }
331
332 pub fn resume(&self) {
335 self.pool.if_running(true);
336 }
337
338 pub fn running(&self) -> bool {
340 self.pool.is_running()
341 }
342
343 pub fn clear(&self) {
351 self.pool.pool_lock.clear()
352 }
353
354 #[must_use = "futures do nothing unless you `.await` or poll them"]
384 pub async fn init(&self) -> Result<(), M::Error> {
385 let shared_pool = &self.pool;
386
387 let marker = shared_pool.pool_lock.marker();
388
389 shared_pool
390 .replenish_idle_conn(shared_pool.builder.min_idle, marker)
391 .await?;
392
393 shared_pool.manager.on_start(shared_pool);
394
395 Ok(())
396 }
397
398 pub fn set_max_size(&self, size: usize) {
403 self.pool.pool_lock.set_max_size(size);
404 }
405
406 pub fn set_min_idle(&self, size: usize) {
411 self.pool.pool_lock.set_min_idle(size);
412 }
413
414 pub fn get_manager(&self) -> &M {
416 &self.pool.manager
417 }
418
419 pub fn state(&self) -> State {
421 self.pool.pool_lock.state()
422 }
423
424 #[cfg(feature = "no-send")]
426 pub fn thread_id(&self) -> ThreadId {
427 self.id
428 }
429
430 pub(crate) fn new(builder: Builder, manager: M) -> Self {
431 let pool = ManagedPool::new(builder, manager);
432 Pool {
433 pool: WrapPointer::new(pool),
434 #[cfg(feature = "no-send")]
435 id: thread::current().id(),
436 }
437 }
438}
439
440pub struct PoolRef<'a, M: Manager> {
441 conn: Option<Conn<M>>,
442 shared_pool: &'a SharedManagedPool<M>,
443 marker: Option<usize>,
445}
446
447pub struct PoolRefOwned<M: Manager> {
448 conn: Option<Conn<M>>,
449 shared_pool: WeakSharedManagedPool<M>,
450 marker: Option<usize>,
451}
452
453impl<M: Manager> PoolRef<'_, M> {
454 pub fn get_conn(&mut self) -> &mut M::Connection {
456 &mut *self
457 }
458
459 pub fn take_conn(&mut self) -> Option<M::Connection> {
461 self.conn.take().map(|c| {
462 self.marker = Some(c.marker);
463 c.conn
464 })
465 }
466
467 pub fn push_conn(&mut self, conn: M::Connection) {
471 let marker = match self.marker {
474 Some(marker) => marker,
475 None => self.conn.as_ref().map(|c| c.marker()).unwrap(),
476 };
477
478 self.conn = Some(Conn {
479 conn,
480 marker,
481 birth: Instant::now(),
482 });
483 }
484
485 pub fn get_manager(&self) -> &M {
486 &self.shared_pool.manager
487 }
488}
489
490impl<M: Manager> PoolRefOwned<M> {
491 pub fn get_conn(&mut self) -> &mut M::Connection {
493 &mut *self
494 }
495
496 pub fn take_conn(&mut self) -> Option<M::Connection> {
498 self.conn.take().map(|c| {
499 self.marker = Some(c.marker);
500 c.conn
501 })
502 }
503
504 pub fn push_conn(&mut self, conn: M::Connection) {
508 let marker = match self.marker {
511 Some(marker) => marker,
512 None => self.conn.as_ref().map(|c| c.marker()).unwrap(),
513 };
514
515 self.conn = Some(Conn {
516 conn,
517 marker,
518 birth: Instant::now(),
519 });
520 }
521}
522
523pub(crate) trait PoolRefBehavior<'a, M: Manager>
524where
525 Self: DerefMut<Target = M::Connection> + Sized,
526{
527 fn from_idle(conn: IdleConn<M>, shared_pool: &'a SharedManagedPool<M>) -> Self;
528
529 fn take_drop(self) {}
530}
531
532impl<'re, M: Manager> PoolRefBehavior<'re, M> for PoolRef<'re, M> {
533 fn from_idle(conn: IdleConn<M>, shared_pool: &'re SharedManagedPool<M>) -> Self {
534 Self {
535 conn: Some(conn.into()),
536 shared_pool,
537 marker: None,
538 }
539 }
540
541 fn take_drop(mut self) {
542 let _ = self.take_conn();
543 }
544}
545
546impl<M: Manager> PoolRefBehavior<'_, M> for PoolRefOwned<M> {
547 fn from_idle(conn: IdleConn<M>, shared_pool: &SharedManagedPool<M>) -> Self {
548 Self {
549 conn: Some(conn.into()),
550 shared_pool: WrapPointer::downgrade(shared_pool),
551 marker: None,
552 }
553 }
554
555 fn take_drop(mut self) {
556 let _ = self.take_conn();
557 }
558}
559
560impl<M: Manager> Deref for PoolRef<'_, M> {
561 type Target = M::Connection;
562
563 fn deref(&self) -> &Self::Target {
564 &self
565 .conn
566 .as_ref()
567 .expect("Connection has already been taken")
568 .conn
569 }
570}
571
572impl<M: Manager> DerefMut for PoolRef<'_, M> {
573 fn deref_mut(&mut self) -> &mut Self::Target {
574 &mut self
575 .conn
576 .as_mut()
577 .expect("Connection has already been taken")
578 .conn
579 }
580}
581
582impl<M: Manager> Deref for PoolRefOwned<M> {
583 type Target = M::Connection;
584
585 fn deref(&self) -> &Self::Target {
586 &self
587 .conn
588 .as_ref()
589 .expect("Connection has already been taken")
590 .conn
591 }
592}
593
594impl<M: Manager> DerefMut for PoolRefOwned<M> {
595 fn deref_mut(&mut self) -> &mut Self::Target {
596 &mut self
597 .conn
598 .as_mut()
599 .expect("Connection has already been taken")
600 .conn
601 }
602}
603
604impl<M: Manager> Drop for PoolRef<'_, M> {
605 #[inline]
606 fn drop(&mut self) {
607 self.shared_pool.drop_pool_ref(&mut self.conn, self.marker);
608 }
609}
610
611impl<M: Manager> Drop for PoolRefOwned<M> {
612 #[inline]
613 fn drop(&mut self) {
614 if let Some(shared_pool) = self.shared_pool.upgrade() {
615 shared_pool.drop_pool_ref(&mut self.conn, self.marker);
616 }
617 }
618}
619
620trait DropAndSpawn<M: Manager> {
621 fn drop_pool_ref(&self, conn: &mut Option<Conn<M>>, marker: Option<usize>);
622
623 fn spawn_drop(&self, marker: usize);
624}
625
626impl<M: Manager> DropAndSpawn<M> for SharedManagedPool<M> {
627 #[inline]
628 fn drop_pool_ref(&self, conn: &mut Option<Conn<M>>, marker: Option<usize>) {
629 if !self.is_running() {
630 self.drop_conn(0, false);
632 return;
633 }
634
635 let mut conn = match conn.take() {
636 Some(conn) => conn,
637 None => {
638 self.spawn_drop(marker.unwrap());
640 return;
641 }
642 };
643
644 let is_closed = self.manager.is_closed(&mut conn.conn);
645 if is_closed {
646 self.spawn_drop(conn.marker);
647 } else {
648 self.pool_lock.put_back(conn.into());
649 };
650 }
651
652 #[cold]
656 fn spawn_drop(&self, marker: usize) {
657 let opt = self.drop_conn(marker, true);
658
659 if let Some(pending) = opt {
660 let shared_clone = self.clone();
661 self.spawn(async move {
662 let _ = shared_clone.replenish_idle_conn(pending, marker).await;
663 });
664 }
665 }
666}