tang_rs/
pool.rs

1use core::fmt;
2use core::future::Future;
3use core::ops::{Deref, DerefMut};
4use core::sync::atomic::{AtomicBool, Ordering};
5#[cfg(not(feature = "no-send"))]
6use std::sync::{Arc as WrapPointer, Weak};
7use std::time::Instant;
8#[cfg(feature = "no-send")]
9use std::{
10    rc::{Rc as WrapPointer, Weak},
11    thread::{self, ThreadId},
12};
13
14use crate::builder::Builder;
15use crate::manager::Manager;
16use crate::pool_inner::{PoolLock, State};
17
18pub struct Conn<M: Manager> {
19    conn: M::Connection,
20    marker: usize,
21    birth: Instant,
22}
23
24impl<M: Manager> Conn<M> {
25    pub(crate) fn marker(&self) -> usize {
26        self.marker
27    }
28}
29
30pub struct IdleConn<M: Manager> {
31    conn: Conn<M>,
32    idle_start: Instant,
33}
34
35impl<M: Manager> IdleConn<M> {
36    fn new(conn: M::Connection, marker: usize) -> Self {
37        let now = Instant::now();
38        IdleConn {
39            conn: Conn {
40                conn,
41                marker,
42                birth: now,
43            },
44            idle_start: now,
45        }
46    }
47
48    #[inline]
49    pub(crate) fn marker(&self) -> usize {
50        self.conn.marker
51    }
52}
53
54impl<M: Manager> From<Conn<M>> for IdleConn<M> {
55    fn from(conn: Conn<M>) -> IdleConn<M> {
56        let now = Instant::now();
57        IdleConn {
58            conn,
59            idle_start: now,
60        }
61    }
62}
63
64impl<M: Manager> From<IdleConn<M>> for Conn<M> {
65    fn from(conn: IdleConn<M>) -> Conn<M> {
66        Conn {
67            conn: conn.conn.conn,
68            birth: conn.conn.birth,
69            marker: conn.conn.marker,
70        }
71    }
72}
73
74pub struct ManagedPool<M: Manager> {
75    builder: Builder,
76    manager: M,
77    running: AtomicBool,
78    pool_lock: PoolLock<M>,
79}
80
81impl<M: Manager> ManagedPool<M> {
82    fn new(builder: Builder, manager: M) -> Self {
83        let pool_lock = PoolLock::from_builder(&builder);
84
85        Self {
86            builder,
87            manager,
88            running: AtomicBool::new(true),
89            pool_lock,
90        }
91    }
92
93    #[inline]
94    async fn get_conn<'a, R>(&'a self, shared_pool: &'a SharedManagedPool<M>) -> Result<R, M::Error>
95    where
96        R: PoolRefBehavior<'a, M> + Unpin,
97    {
98        let fut = self.pool_lock.lock::<R>(shared_pool);
99        let timeout = self.builder.wait_timeout;
100
101        let mut pool_ref = self.manager.timeout(fut, timeout).await?;
102
103        if self.builder.always_check {
104            let mut retry = 0u8;
105            loop {
106                let result = self.check_conn(&mut pool_ref).await;
107                match result {
108                    Ok(Ok(_)) => break,
109                    Ok(Err(e)) => {
110                        pool_ref.take_drop();
111                        if retry == 3 {
112                            return Err(e);
113                        } else {
114                            retry += 1;
115                        };
116                    }
117                    Err(timeout_err) => {
118                        pool_ref.take_drop();
119                        return Err(timeout_err);
120                    }
121                }
122
123                let fut = self.pool_lock.lock::<R>(shared_pool);
124
125                pool_ref = self.manager.timeout(fut, timeout).await?;
126            }
127        };
128
129        Ok(pool_ref)
130    }
131
132    fn drop_conn(&self, marker: usize, should_spawn_new: bool) -> Option<usize> {
133        //  We might need to spin up more connections to maintain the idle limit.
134        //  e.g. if we hit connection lifetime limits
135        self.pool_lock.dec_spawned(marker, should_spawn_new)
136    }
137
138    pub(crate) async fn add_idle_conn(&self, marker: usize) -> Result<(), M::Error> {
139        let fut = self.manager.connect();
140        let timeout = self.builder.connection_timeout;
141
142        let conn = self
143            .manager
144            .timeout(fut, timeout)
145            .await
146            .map_err(|e| {
147                self.pool_lock.dec_pending(1);
148                e
149            })?
150            .map_err(|e| {
151                self.pool_lock.dec_pending(1);
152                e
153            })?;
154
155        self.pool_lock
156            .put_back_inc_spawned(IdleConn::new(conn, marker));
157
158        Ok(())
159    }
160
161    async fn check_conn(&self, conn: &mut M::Connection) -> Result<Result<(), M::Error>, M::Error> {
162        let fut = self.manager.is_valid(conn);
163
164        let timeout = self.builder.connection_timeout;
165
166        let res = self.manager.timeout(fut, timeout).await?;
167
168        Ok(res)
169    }
170
171    async fn replenish_idle_conn(
172        &self,
173        pending_count: usize,
174        marker: usize,
175    ) -> Result<(), M::Error> {
176        for i in 0..pending_count {
177            self.add_idle_conn(marker).await.map_err(|e| {
178                // we return when an error occur so we should drop all the pending after the i.
179                // (the pending of i is already dropped in add_connection method)
180                let count = pending_count - i - 1;
181                if count > 0 {
182                    self.pool_lock.dec_pending(count);
183                };
184                e
185            })?;
186        }
187
188        Ok(())
189    }
190
191    fn if_running(&self, running: bool) {
192        self.running.store(running, Ordering::Release);
193    }
194
195    pub(crate) fn is_running(&self) -> bool {
196        self.running.load(Ordering::Acquire)
197    }
198
199    #[cfg(not(feature = "no-send"))]
200    pub(crate) fn spawn<Fut>(&self, fut: Fut)
201    where
202        Fut: Future<Output = ()> + Send + 'static,
203    {
204        self.manager.spawn(fut);
205    }
206
207    #[cfg(feature = "no-send")]
208    pub(crate) fn spawn<Fut>(&self, fut: Fut)
209    where
210        Fut: Future<Output = ()> + 'static,
211    {
212        self.manager.spawn(fut);
213    }
214
215    pub async fn reap_idle_conn(&self) -> Result<(), M::Error> {
216        let now = Instant::now();
217
218        let pending_new = self.pool_lock.try_drop_conn(|conn| {
219            let mut should_drop = false;
220            if let Some(timeout) = self.builder.idle_timeout {
221                should_drop |= now >= conn.idle_start + timeout;
222            }
223            if let Some(lifetime) = self.builder.max_lifetime {
224                should_drop |= now >= conn.conn.birth + lifetime;
225            }
226            should_drop
227        });
228
229        match pending_new {
230            Some((pending_new, marker)) => self.replenish_idle_conn(pending_new, marker).await,
231            None => Ok(()),
232        }
233    }
234
235    pub fn garbage_collect(&self) {
236        self.pool_lock
237            .drop_pending(|pending| pending.should_remove(self.builder.connection_timeout));
238    }
239
240    /// expose `Builder` to public
241    pub fn get_builder(&self) -> &Builder {
242        &self.builder
243    }
244}
245
246pub type SharedManagedPool<M> = WrapPointer<ManagedPool<M>>;
247
248pub type WeakSharedManagedPool<M> = Weak<ManagedPool<M>>;
249
250pub struct Pool<M: Manager> {
251    pool: SharedManagedPool<M>,
252    #[cfg(feature = "no-send")]
253    id: ThreadId,
254}
255
256impl<M: Manager> Clone for Pool<M> {
257    fn clone(&self) -> Self {
258        Self {
259            pool: self.pool.clone(),
260            #[cfg(feature = "no-send")]
261            id: self.id,
262        }
263    }
264}
265
266impl<M: Manager> fmt::Debug for Pool<M> {
267    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
268        f.write_fmt(format_args!("Pool({:p})", self.pool))
269    }
270}
271
272impl<M: Manager> Drop for Pool<M> {
273    fn drop(&mut self) {
274        self.pool.manager.on_stop();
275    }
276}
277
278impl<M: Manager> Pool<M> {
279    /// Return a `PoolRef` contains reference of `SharedManagedPool<Manager>` and an `Option<Manager::Connection>`.
280    ///
281    /// The `PoolRef` should be dropped asap when you finish the use of it. Hold it in scope would prevent the connection from pushed back to pool.
282    #[must_use = "futures do nothing unless you `.await` or poll them"]
283    pub async fn get(&self) -> Result<PoolRef<'_, M>, M::Error> {
284        let shared_pool = &self.pool;
285
286        shared_pool.get_conn(shared_pool).await
287    }
288
289    /// Return a `PoolRefOwned` contains a weak smart pointer of `SharedManagedPool<Manager>` and an `Option<Manager::Connection>`.
290    ///
291    /// You can move `PoolRefOwned` to async blocks and across await point. But the performance is considerably worse than `Pool::get`.
292    #[must_use = "futures do nothing unless you `.await` or poll them"]
293    pub async fn get_owned(&self) -> Result<PoolRefOwned<M>, M::Error> {
294        let shared_pool = &self.pool;
295
296        shared_pool.get_conn(shared_pool).await
297    }
298
299    /// Run the pool with an async closure.
300    ///
301    /// # example:
302    /// ```ignore
303    /// pool.run(|mut pool_ref| async {
304    ///     let connection = &mut *pool_ref;
305    ///     Ok(())
306    /// })
307    /// ```
308    #[must_use = "futures do nothing unless you `.await` or poll them"]
309    pub async fn run<'a, T, E, F, FF>(&'a self, f: F) -> Result<T, E>
310    where
311        F: FnOnce(PoolRef<'a, M>) -> FF,
312        FF: Future<Output = Result<T, E>> + Send + 'a,
313        E: From<M::Error>,
314        T: Send + 'static,
315    {
316        let pool_ref = self.get().await?;
317        f(pool_ref).await
318    }
319
320    /// Pause the pool
321    ///
322    /// these functionalities will stop:
323    /// - get connection. `Pool<Manager>::get()` would eventually be timed out
324    ///     (If `Manager::timeout` is manually implemented with proper timeout function. *. Otherwise it will stuck forever in executor unless you cancel the future).
325    /// - spawn of new connection.
326    /// - default scheduled works (They would skip at least one iteration if the schedule time come across with the time period the pool is paused).
327    /// - put back connection. (connection will be dropped instead.)
328    pub fn pause(&self) {
329        self.pool.if_running(false);
330    }
331
332    /// restart the pool.
333    // ToDo: for now pool would lose accuracy for min_idle after restart. It would recover after certain amount of requests to pool.
334    pub fn resume(&self) {
335        self.pool.if_running(true);
336    }
337
338    /// check if the pool is running.
339    pub fn running(&self) -> bool {
340        self.pool.is_running()
341    }
342
343    /// Clear the pool.
344    ///
345    /// All pending connections will also be destroyed.
346    ///
347    /// Spawned count is not reset to 0 so new connections can't fill the pool until all outgoing `PoolRef` are dropped.
348    ///
349    /// All `PoolRef' and 'PoolRefOwned` outside of pool before the clear happen would be destroyed when trying to return it's connection to pool.
350    pub fn clear(&self) {
351        self.pool.pool_lock.clear()
352    }
353
354    /// manually initialize pool. this is usually called when the `Pool` is built with `build_uninitialized`
355    /// This is useful when you want to make an empty `Pool` and initialize it later.
356    /// # example:
357    /// ```ignore
358    /// #[macro_use]
359    /// extern crate lazy_static;
360    ///
361    /// use tokio_postgres_tang::{Pool, PostgresManager, Builder};
362    /// use tokio_postgres::NoTls;
363    ///
364    /// lazy_static! {
365    ///    static ref POOL: Pool<PostgresManager<NoTls>> = Builder::new()
366    ///         .always_check(false)
367    ///         .idle_timeout(None)
368    ///         .max_lifetime(None)
369    ///         .min_idle(24)
370    ///         .max_size(24)
371    ///         .build_uninitialized(
372    ///             PostgresManager::new_from_stringlike("postgres://postgres:123@localhost/test", NoTls)
373    ///                 .expect("can't make postgres manager")
374    ///         );
375    /// }
376    ///
377    /// #[tokio::main]
378    /// async fn main() -> std::io::Result<()> {
379    ///     POOL.init().await.expect("Failed to initialize postgres pool");
380    ///     Ok(())
381    /// }
382    /// ```
383    #[must_use = "futures do nothing unless you `.await` or poll them"]
384    pub async fn init(&self) -> Result<(), M::Error> {
385        let shared_pool = &self.pool;
386
387        let marker = shared_pool.pool_lock.marker();
388
389        shared_pool
390            .replenish_idle_conn(shared_pool.builder.min_idle, marker)
391            .await?;
392
393        shared_pool.manager.on_start(shared_pool);
394
395        Ok(())
396    }
397
398    /// Change the max size of pool. This operation could result in some reallocation of `PoolInner` and impact the performance.
399    /// (`Pool<Manager>::clear()` will recalibrate the pool with a capacity of current max/min pool size)
400    ///
401    /// No actual check is used for new `max_size`. Be ware not to pass a size smaller than `min_idle`.
402    pub fn set_max_size(&self, size: usize) {
403        self.pool.pool_lock.set_max_size(size);
404    }
405
406    /// Change the min idle size of pool.
407    /// (`Pool<Manager>::clear()` will recalibrate the pool with a capacity of current max/min pool size)
408    ///
409    /// No actual check is used for new `min_idle`. Be ware not to pass a size bigger than `max_size`.
410    pub fn set_min_idle(&self, size: usize) {
411        self.pool.pool_lock.set_min_idle(size);
412    }
413
414    /// expose `Manager` to public
415    pub fn get_manager(&self) -> &M {
416        &self.pool.manager
417    }
418
419    /// Return a state of the pool inner. This call will block the thread and wait for lock.
420    pub fn state(&self) -> State {
421        self.pool.pool_lock.state()
422    }
423
424    /// Return the thread id the pool is running on.(This is only useful when running the pool on single threads)
425    #[cfg(feature = "no-send")]
426    pub fn thread_id(&self) -> ThreadId {
427        self.id
428    }
429
430    pub(crate) fn new(builder: Builder, manager: M) -> Self {
431        let pool = ManagedPool::new(builder, manager);
432        Pool {
433            pool: WrapPointer::new(pool),
434            #[cfg(feature = "no-send")]
435            id: thread::current().id(),
436        }
437    }
438}
439
440pub struct PoolRef<'a, M: Manager> {
441    conn: Option<Conn<M>>,
442    shared_pool: &'a SharedManagedPool<M>,
443    // marker is only used to store the marker of Conn<M> if it's been taken from pool
444    marker: Option<usize>,
445}
446
447pub struct PoolRefOwned<M: Manager> {
448    conn: Option<Conn<M>>,
449    shared_pool: WeakSharedManagedPool<M>,
450    marker: Option<usize>,
451}
452
453impl<M: Manager> PoolRef<'_, M> {
454    /// get a mut reference of connection.
455    pub fn get_conn(&mut self) -> &mut M::Connection {
456        &mut *self
457    }
458
459    /// take the the ownership of connection from pool and it won't be pushed back to pool anymore.
460    pub fn take_conn(&mut self) -> Option<M::Connection> {
461        self.conn.take().map(|c| {
462            self.marker = Some(c.marker);
463            c.conn
464        })
465    }
466
467    /// manually push a connection to pool. We treat this connection as a new born one.
468    ///
469    /// operation will fail if the pool is already in full capacity(no error will return and this connection will be dropped silently)
470    pub fn push_conn(&mut self, conn: M::Connection) {
471        // if the PoolRef have a marker then the conn must have been taken.
472        // otherwise we give the marker of self.conn to the newly generated one.
473        let marker = match self.marker {
474            Some(marker) => marker,
475            None => self.conn.as_ref().map(|c| c.marker()).unwrap(),
476        };
477
478        self.conn = Some(Conn {
479            conn,
480            marker,
481            birth: Instant::now(),
482        });
483    }
484
485    pub fn get_manager(&self) -> &M {
486        &self.shared_pool.manager
487    }
488}
489
490impl<M: Manager> PoolRefOwned<M> {
491    /// get a mut reference of connection.
492    pub fn get_conn(&mut self) -> &mut M::Connection {
493        &mut *self
494    }
495
496    /// take the the ownership of connection from pool and it won't be pushed back to pool anymore.
497    pub fn take_conn(&mut self) -> Option<M::Connection> {
498        self.conn.take().map(|c| {
499            self.marker = Some(c.marker);
500            c.conn
501        })
502    }
503
504    /// manually push a connection to pool. We treat this connection as a new born one.
505    ///
506    /// operation will fail if the pool is already in full capacity(no error will return and this connection will be dropped silently)
507    pub fn push_conn(&mut self, conn: M::Connection) {
508        // if the PoolRef have a marker then the conn must have been taken.
509        // otherwise we give the marker of self.conn to the newly generated one.
510        let marker = match self.marker {
511            Some(marker) => marker,
512            None => self.conn.as_ref().map(|c| c.marker()).unwrap(),
513        };
514
515        self.conn = Some(Conn {
516            conn,
517            marker,
518            birth: Instant::now(),
519        });
520    }
521}
522
523pub(crate) trait PoolRefBehavior<'a, M: Manager>
524where
525    Self: DerefMut<Target = M::Connection> + Sized,
526{
527    fn from_idle(conn: IdleConn<M>, shared_pool: &'a SharedManagedPool<M>) -> Self;
528
529    fn take_drop(self) {}
530}
531
532impl<'re, M: Manager> PoolRefBehavior<'re, M> for PoolRef<'re, M> {
533    fn from_idle(conn: IdleConn<M>, shared_pool: &'re SharedManagedPool<M>) -> Self {
534        Self {
535            conn: Some(conn.into()),
536            shared_pool,
537            marker: None,
538        }
539    }
540
541    fn take_drop(mut self) {
542        let _ = self.take_conn();
543    }
544}
545
546impl<M: Manager> PoolRefBehavior<'_, M> for PoolRefOwned<M> {
547    fn from_idle(conn: IdleConn<M>, shared_pool: &SharedManagedPool<M>) -> Self {
548        Self {
549            conn: Some(conn.into()),
550            shared_pool: WrapPointer::downgrade(shared_pool),
551            marker: None,
552        }
553    }
554
555    fn take_drop(mut self) {
556        let _ = self.take_conn();
557    }
558}
559
560impl<M: Manager> Deref for PoolRef<'_, M> {
561    type Target = M::Connection;
562
563    fn deref(&self) -> &Self::Target {
564        &self
565            .conn
566            .as_ref()
567            .expect("Connection has already been taken")
568            .conn
569    }
570}
571
572impl<M: Manager> DerefMut for PoolRef<'_, M> {
573    fn deref_mut(&mut self) -> &mut Self::Target {
574        &mut self
575            .conn
576            .as_mut()
577            .expect("Connection has already been taken")
578            .conn
579    }
580}
581
582impl<M: Manager> Deref for PoolRefOwned<M> {
583    type Target = M::Connection;
584
585    fn deref(&self) -> &Self::Target {
586        &self
587            .conn
588            .as_ref()
589            .expect("Connection has already been taken")
590            .conn
591    }
592}
593
594impl<M: Manager> DerefMut for PoolRefOwned<M> {
595    fn deref_mut(&mut self) -> &mut Self::Target {
596        &mut self
597            .conn
598            .as_mut()
599            .expect("Connection has already been taken")
600            .conn
601    }
602}
603
604impl<M: Manager> Drop for PoolRef<'_, M> {
605    #[inline]
606    fn drop(&mut self) {
607        self.shared_pool.drop_pool_ref(&mut self.conn, self.marker);
608    }
609}
610
611impl<M: Manager> Drop for PoolRefOwned<M> {
612    #[inline]
613    fn drop(&mut self) {
614        if let Some(shared_pool) = self.shared_pool.upgrade() {
615            shared_pool.drop_pool_ref(&mut self.conn, self.marker);
616        }
617    }
618}
619
620trait DropAndSpawn<M: Manager> {
621    fn drop_pool_ref(&self, conn: &mut Option<Conn<M>>, marker: Option<usize>);
622
623    fn spawn_drop(&self, marker: usize);
624}
625
626impl<M: Manager> DropAndSpawn<M> for SharedManagedPool<M> {
627    #[inline]
628    fn drop_pool_ref(&self, conn: &mut Option<Conn<M>>, marker: Option<usize>) {
629        if !self.is_running() {
630            // marker here doesn't matter as should_spawn_new would reject new connection generation
631            self.drop_conn(0, false);
632            return;
633        }
634
635        let mut conn = match conn.take() {
636            Some(conn) => conn,
637            None => {
638                // if the connection is taken then self.marker must be Some(usize)
639                self.spawn_drop(marker.unwrap());
640                return;
641            }
642        };
643
644        let is_closed = self.manager.is_closed(&mut conn.conn);
645        if is_closed {
646            self.spawn_drop(conn.marker);
647        } else {
648            self.pool_lock.put_back(conn.into());
649        };
650    }
651
652    // helper function to spawn drop connection and spawn new ones if needed.
653    // Conn<M> should be dropped in place where spawn_drop() is used.
654    // ToDo: Will get unsolvable pending if the spawn is panic;
655    #[cold]
656    fn spawn_drop(&self, marker: usize) {
657        let opt = self.drop_conn(marker, true);
658
659        if let Some(pending) = opt {
660            let shared_clone = self.clone();
661            self.spawn(async move {
662                let _ = shared_clone.replenish_idle_conn(pending, marker).await;
663            });
664        }
665    }
666}