reool/pools/pool_internal/
extended_connection_factory.rs1use std::sync::Arc;
3use std::time::Instant;
4
5use future::BoxFuture;
6use futures::prelude::*;
7use log::{trace, warn};
8use tokio::sync::mpsc;
9use tokio::time::sleep;
10
11use crate::backoff_strategy::BackoffStrategy;
12use crate::connection_factory::ConnectionFactory;
13use crate::{Ping, Poolable};
14
15use super::inner_pool::PoolMessage;
16
17use super::instrumentation::PoolInstrumentation;
18use super::{Managed, PoolMessageEnvelope};
19
20pub(crate) struct ExtendedConnectionFactory<T: Poolable> {
28 inner_factory: Arc<dyn ConnectionFactory<Connection = T> + Send + Sync + 'static>,
29 send_back: mpsc::UnboundedSender<PoolMessageEnvelope<T>>,
30 pub instrumentation: PoolInstrumentation,
31 back_off_strategy: BackoffStrategy,
32}
33
34impl<T: Poolable> ExtendedConnectionFactory<T> {
35 pub fn new(
36 inner_factory: Arc<dyn ConnectionFactory<Connection = T> + Send + Sync + 'static>,
37 send_back: mpsc::UnboundedSender<PoolMessageEnvelope<T>>,
38 instrumentation: PoolInstrumentation,
39 back_off_strategy: BackoffStrategy,
40 ) -> Self {
41 Self {
42 inner_factory,
43 send_back,
44 instrumentation,
45 back_off_strategy,
46 }
47 }
48
49 pub fn send_back_cloned(&self) -> mpsc::UnboundedSender<PoolMessageEnvelope<T>> {
51 self.send_back.clone()
52 }
53
54 pub fn send_message(&mut self, message: PoolMessage<T>) -> Result<(), PoolMessage<T>> {
59 message.send_on_internal_channel(&mut self.send_back)
60 }
61
62 pub fn create_connection(mut self, initiated_at: Instant) {
68 let f = async move {
69 let mut attempt = 1;
70
71 let result = loop {
72 if self
74 .send_message(PoolMessage::CheckAlive(Instant::now()))
75 .is_err()
76 {
77 break Err("Pool is gone.".to_string());
78 }
79
80 match self.do_a_create_connection_attempt(initiated_at).await {
81 Ok(managed) => {
82 drop(managed); trace!("Dropped newly created connection to be sent to pool");
85
86 break Ok(());
87 }
88 Err(this) => {
89 self = this;
90
91 if let Some(backoff) = self.back_off_strategy.get_next_backoff(attempt) {
93 warn!(
94 "Retry on in to create connection after attempt {} in {:?}",
95 attempt, backoff
96 );
97
98 sleep(backoff).await;
99 } else {
100 warn!(
101 "Retry on in to create connection after attempt {} immediately",
102 attempt
103 );
104 }
105
106 attempt += 1;
107 }
108 }
109 };
110
111 if let Err(err) = result {
112 warn!("Create connection finally failed: {}", err);
113 }
114 };
115
116 tokio::spawn(f);
117 }
118
119 pub fn connecting_to(&self) -> &str {
120 self.inner_factory.connecting_to()
121 }
122
123 pub fn ping(&self, timeout: Instant) -> BoxFuture<Ping> {
124 self.inner_factory.ping(timeout)
125 }
126
127 fn do_a_create_connection_attempt(
130 self,
131 initiated_at: Instant,
132 ) -> impl Future<Output = Result<Managed<T>, Self>> {
133 let start_connect = Instant::now();
134 let inner_factory = Arc::clone(&self.inner_factory);
135
136 async move {
137 let conn = inner_factory.create_connection().await;
138
139 match conn {
140 Ok(conn) => {
141 trace!("new connection created");
142
143 self.instrumentation
144 .connection_created(initiated_at.elapsed(), start_connect.elapsed());
145
146 Ok(Managed::fresh(conn, self))
147 }
148 Err(err) => {
149 self.instrumentation.connection_factory_failed();
150
151 warn!("Connection factory failed: {}", err);
152
153 Err(self)
154 }
155 }
156 }
157 }
158}