reool/pools/pool_internal/
extended_connection_factory.rs

1//! A pimped connection factory
2use std::sync::Arc;
3use std::time::Instant;
4
5use future::BoxFuture;
6use futures::prelude::*;
7use log::{trace, warn};
8use tokio::sync::mpsc;
9use tokio::time::sleep;
10
11use crate::backoff_strategy::BackoffStrategy;
12use crate::connection_factory::ConnectionFactory;
13use crate::{Ping, Poolable};
14
15use super::inner_pool::PoolMessage;
16
17use super::instrumentation::PoolInstrumentation;
18use super::{Managed, PoolMessageEnvelope};
19
20/// A connection factory that uses a retry logic when creating connections. As long
21/// as the pool is there, it will retry to create a connection.
22///
23/// Once a connection is created it will be send to the pool via a channel.
24///
25/// There is also instrumentation included since some metrics can be directly sent
26/// after a connection was created or not created.
27pub(crate) struct ExtendedConnectionFactory<T: Poolable> {
28    inner_factory: Arc<dyn ConnectionFactory<Connection = T> + Send + Sync + 'static>,
29    send_back: mpsc::UnboundedSender<PoolMessageEnvelope<T>>,
30    pub instrumentation: PoolInstrumentation,
31    back_off_strategy: BackoffStrategy,
32}
33
34impl<T: Poolable> ExtendedConnectionFactory<T> {
35    pub fn new(
36        inner_factory: Arc<dyn ConnectionFactory<Connection = T> + Send + Sync + 'static>,
37        send_back: mpsc::UnboundedSender<PoolMessageEnvelope<T>>,
38        instrumentation: PoolInstrumentation,
39        back_off_strategy: BackoffStrategy,
40    ) -> Self {
41        Self {
42            inner_factory,
43            send_back,
44            instrumentation,
45            back_off_strategy,
46        }
47    }
48
49    /// Returns a cloned version of the sender to the internal channel
50    pub fn send_back_cloned(&self) -> mpsc::UnboundedSender<PoolMessageEnvelope<T>> {
51        self.send_back.clone()
52    }
53
54    /// Sends a `PoolMessage` on the internal channel.
55    ///
56    /// The message will be wrapped automatically and being unwrapped when
57    /// the message could not be sent.
58    pub fn send_message(&mut self, message: PoolMessage<T>) -> Result<(), PoolMessage<T>> {
59        message.send_on_internal_channel(&mut self.send_back)
60    }
61
62    /// Create a new connections and try as long as the pool is there.
63    ///
64    /// Before each attempt this functions tries to send a probing message to the pool. If
65    /// sending the message fails the channel to the pool is disconnected which
66    /// means that the pool has been dropped.
67    pub fn create_connection(mut self, initiated_at: Instant) {
68        let f = async move {
69            let mut attempt = 1;
70
71            let result = loop {
72                // Probe the channel to the inner pool
73                if self
74                    .send_message(PoolMessage::CheckAlive(Instant::now()))
75                    .is_err()
76                {
77                    break Err("Pool is gone.".to_string());
78                }
79
80                match self.do_a_create_connection_attempt(initiated_at).await {
81                    Ok(managed) => {
82                        drop(managed); // We send it to the pool by dropping it
83
84                        trace!("Dropped newly created connection to be sent to pool");
85
86                        break Ok(());
87                    }
88                    Err(this) => {
89                        self = this;
90
91                        // break delayed_by_backoff_strategy(&factory, &mut attempt).await
92                        if let Some(backoff) = self.back_off_strategy.get_next_backoff(attempt) {
93                            warn!(
94                                "Retry on in to create connection after attempt {} in {:?}",
95                                attempt, backoff
96                            );
97
98                            sleep(backoff).await;
99                        } else {
100                            warn!(
101                                "Retry on in to create connection after attempt {} immediately",
102                                attempt
103                            );
104                        }
105
106                        attempt += 1;
107                    }
108                }
109            };
110
111            if let Err(err) = result {
112                warn!("Create connection finally failed: {}", err);
113            }
114        };
115
116        tokio::spawn(f);
117    }
118
119    pub fn connecting_to(&self) -> &str {
120        self.inner_factory.connecting_to()
121    }
122
123    pub fn ping(&self, timeout: Instant) -> BoxFuture<Ping> {
124        self.inner_factory.ping(timeout)
125    }
126
127    /// Do one attempt on the inner connection factory to
128    /// get a new connection
129    fn do_a_create_connection_attempt(
130        self,
131        initiated_at: Instant,
132    ) -> impl Future<Output = Result<Managed<T>, Self>> {
133        let start_connect = Instant::now();
134        let inner_factory = Arc::clone(&self.inner_factory);
135
136        async move {
137            let conn = inner_factory.create_connection().await;
138
139            match conn {
140                Ok(conn) => {
141                    trace!("new connection created");
142
143                    self.instrumentation
144                        .connection_created(initiated_at.elapsed(), start_connect.elapsed());
145
146                    Ok(Managed::fresh(conn, self))
147                }
148                Err(err) => {
149                    self.instrumentation.connection_factory_failed();
150
151                    warn!("Connection factory failed: {}", err);
152
153                    Err(self)
154                }
155            }
156        }
157    }
158}