1use crate::connection::{Connection};
2use crate::error::ConnectionError;
3use crate::executor::Executor;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::Duration;
7
8use futures::{channel::oneshot, lock::Mutex};
9use native_tls::Certificate;
10use rand::Rng;
11use url::Url;
12
13#[derive(Debug, Clone, Hash, PartialEq, Eq)]
15pub struct BrokerAddress {
16 pub url: Url,
18 pub broker_url: String,
21 pub proxy: bool,
23}
24
25#[derive(Debug, Clone)]
27pub struct ConnectionRetryOptions {
28 pub min_backoff: Duration,
30 pub max_backoff: Duration,
32 pub max_retries: u32,
34 pub connection_timeout: Duration,
36 pub keep_alive: Duration,
38}
39
40impl std::default::Default for ConnectionRetryOptions {
41 fn default() -> Self {
42 ConnectionRetryOptions {
43 min_backoff: Duration::from_millis(10),
44 max_backoff: Duration::from_secs(30),
45 max_retries: 12u32,
46 connection_timeout: Duration::from_secs(10),
47 keep_alive: Duration::from_secs(60),
48 }
49 }
50}
51
52#[derive(Debug, Clone)]
54pub struct OperationRetryOptions {
55 pub operation_timeout: Duration,
57 pub retry_delay: Duration,
59 pub max_retries: Option<u32>,
61}
62
63impl std::default::Default for OperationRetryOptions {
64 fn default() -> Self {
65 OperationRetryOptions {
66 operation_timeout: Duration::from_secs(30),
67 retry_delay: Duration::from_millis(500),
68 max_retries: None,
69 }
70 }
71}
72
73#[derive(Debug, Clone)]
75pub struct TlsOptions {
76 pub certificate_chain: Option<Vec<u8>>,
78
79 pub allow_insecure_connection: bool,
83
84 pub tls_hostname_verification_enabled: bool,
88}
89
90impl Default for TlsOptions {
91 fn default() -> Self {
92 Self {
93 certificate_chain: None,
94 allow_insecure_connection: false,
95 tls_hostname_verification_enabled: true,
96 }
97 }
98}
99
100enum ConnectionStatus<Exe: Executor> {
101 Connected(Arc<Connection<Exe>>),
102 Connecting(Vec<oneshot::Sender<Result<Arc<Connection<Exe>>, ConnectionError>>>),
103}
104
105#[derive(Clone)]
111pub struct ConnectionManager<Exe: Executor> {
112 pub url: Url,
113 auth: Option<Arc<Mutex<Box<dyn crate::authentication::Authentication>>>>,
114 pub(crate) executor: Arc<Exe>,
115 connections: Arc<Mutex<HashMap<BrokerAddress, ConnectionStatus<Exe>>>>,
116 connection_retry_options: ConnectionRetryOptions,
117 pub(crate) operation_retry_options: OperationRetryOptions,
118 tls_options: TlsOptions,
119 certificate_chain: Vec<native_tls::Certificate>,
120}
121
122impl<Exe: Executor> ConnectionManager<Exe> {
123 pub async fn new(
124 url: String,
125 auth: Option<Arc<Mutex<Box<dyn crate::authentication::Authentication>>>>,
126 connection_retry: Option<ConnectionRetryOptions>,
127 operation_retry_options: OperationRetryOptions,
128 tls: Option<TlsOptions>,
129 executor: Arc<Exe>,
130 ) -> Result<Self, ConnectionError> {
131 let connection_retry_options = connection_retry.unwrap_or_default();
132 let tls_options = tls.unwrap_or_default();
133 let url = Url::parse(&url)
134 .map_err(|e| {
135 error!("error parsing URL: {:?}", e);
136 ConnectionError::NotFound
137 })
138 .and_then(|url| {
139 url.host_str().ok_or_else(|| {
140 error!("missing host for URL: {:?}", url);
141 ConnectionError::NotFound
142 })?;
143 Ok(url)
144 })?;
145
146 let certificate_chain = match tls_options.certificate_chain.as_ref() {
147 None => vec![],
148 Some(certificate_chain) => {
149 let mut v = vec![];
150 for cert in pem::parse_many(&certificate_chain).iter().rev() {
151 v.push(
152 Certificate::from_der(&cert.contents[..])
153 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?,
154 );
155 }
156 v
157 }
158 };
159
160 if let Some(auth) = auth.clone() {
161 auth.lock().await.initialize().await?;
162 }
163
164 let manager = ConnectionManager {
165 url: url.clone(),
166 auth,
167 executor,
168 connections: Arc::new(Mutex::new(HashMap::new())),
169 connection_retry_options,
170 operation_retry_options,
171 tls_options,
172 certificate_chain,
173 };
174 let broker_address = BrokerAddress {
175 url: url.clone(),
176 broker_url: format!("{}:{}", url.host_str().unwrap(), url.port().unwrap_or(6650)),
177 proxy: false,
178 };
179 manager.connect(broker_address).await?;
180 Ok(manager)
181 }
182
183 pub fn get_base_address(&self) -> BrokerAddress {
184 BrokerAddress {
185 url: self.url.clone(),
186 broker_url: format!(
187 "{}:{}",
188 self.url.host_str().unwrap(),
189 self.url.port().unwrap_or(6650)
190 ),
191 proxy: false,
192 }
193 }
194
195 pub async fn get_base_connection(&self) -> Result<Arc<Connection<Exe>>, ConnectionError> {
199 let broker_address = BrokerAddress {
200 url: self.url.clone(),
201 broker_url: format!(
202 "{}:{}",
203 self.url.host_str().unwrap(),
204 self.url.port().unwrap_or(6650)
205 ),
206 proxy: false,
207 };
208
209 self.get_connection(&broker_address).await
210 }
211
212 pub async fn get_connection(
216 &self,
217 broker: &BrokerAddress,
218 ) -> Result<Arc<Connection<Exe>>, ConnectionError> {
219 let rx = {
220 let mut conns = self.connections.lock().await;
221 match conns.get_mut(broker) {
222 None => None,
223 Some(ConnectionStatus::Connected(conn)) => {
224 if conn.is_valid() {
225 return Ok(conn.clone());
226 } else {
227 None
228 }
229 }
230 Some(ConnectionStatus::Connecting(ref mut v)) => {
231 let (tx, rx) = oneshot::channel();
232 v.push(tx);
233 Some(rx)
234 }
235 }
236 };
237
238 match rx {
239 None => self.connect(broker.clone()).await,
240 Some(rx) => match rx.await {
241 Ok(res) => res,
242 Err(_) => Err(ConnectionError::Canceled),
243 },
244 }
245 }
246
247 async fn connect_inner(
248 &self,
249 broker: &BrokerAddress,
250 ) -> Result<Arc<Connection<Exe>>, ConnectionError> {
251 debug!("ConnectionManager::connect({:?})", broker);
252
253 let rx = {
254 match self
255 .connections
256 .lock()
257 .await
258 .entry(broker.clone())
259 .or_insert_with(|| ConnectionStatus::Connecting(Vec::new()))
260 {
261 ConnectionStatus::Connecting(ref mut v) => {
262 if v.is_empty() {
263 None
264 } else {
265 let (tx, rx) = oneshot::channel();
266 v.push(tx);
267 Some(rx)
268 }
269 }
270 ConnectionStatus::Connected(_) => None,
271 }
272 };
273 if let Some(rx) = rx {
274 return match rx.await {
275 Ok(res) => res,
276 Err(_) => Err(ConnectionError::Canceled),
277 };
278 }
279
280 let proxy_url = if broker.proxy {
281 Some(broker.broker_url.clone())
282 } else {
283 None
284 };
285
286 let mut current_backoff;
287 let mut current_retries = 0u32;
288
289 let start = std::time::Instant::now();
290 let conn = loop {
291 match Connection::new(
292 broker.url.clone(),
293 self.auth.clone(),
294 proxy_url.clone(),
295 &self.certificate_chain,
296 self.tls_options.allow_insecure_connection,
297 self.tls_options.tls_hostname_verification_enabled,
298 self.connection_retry_options.connection_timeout,
299 self.operation_retry_options.operation_timeout,
300 self.executor.clone(),
301 )
302 .await
303 {
304 Ok(c) => break c,
305 Err(ConnectionError::Io(e)) => {
306 if e.kind() != std::io::ErrorKind::ConnectionRefused
307 || e.kind() != std::io::ErrorKind::TimedOut
308 {
309 return Err(ConnectionError::Io(e));
310 }
311
312 if current_retries == self.connection_retry_options.max_retries {
313 return Err(ConnectionError::Io(e));
314 }
315
316 let jitter = rand::thread_rng().gen_range(0..10);
317 current_backoff = std::cmp::min(
318 self.connection_retry_options.min_backoff
319 * 2u32.saturating_pow(current_retries),
320 self.connection_retry_options.max_backoff,
321 ) + self.connection_retry_options.min_backoff * jitter;
322 current_retries += 1;
323
324 trace!(
325 "current retries: {}, current_backoff(pow = {}): {}ms",
326 current_retries,
327 2u32.pow(current_retries - 1),
328 current_backoff.as_millis()
329 );
330 error!(
331 "connection error, retrying connection to {} after {}ms",
332 broker.url,
333 current_backoff.as_millis()
334 );
335 self.executor.delay(current_backoff).await;
336 }
337 Err(e) => return Err(e),
338 }
339 };
340 let connection_id = conn.id();
341 if let Some(url) = proxy_url.as_ref() {
342 info!(
343 "Connected n°{} to {} via proxy {} in {}ms",
344 connection_id,
345 url,
346 broker.url,
347 (std::time::Instant::now() - start).as_millis()
348 );
349 } else {
350 info!(
351 "Connected n°{} to {} in {}ms",
352 connection_id,
353 broker.url,
354 (std::time::Instant::now() - start).as_millis()
355 );
356 }
357 let c = Arc::new(conn);
358
359 Ok(c)
360 }
361
362 async fn connect(
363 &self,
364 broker: BrokerAddress,
365 ) -> Result<Arc<Connection<Exe>>, ConnectionError> {
366 let c = match self.connect_inner(&broker).await {
367 Err(e) => {
368 if let Some(ConnectionStatus::Connecting(mut v)) =
374 self.connections.lock().await.remove(&broker)
375 {
376 for tx in v.drain(..) {
377 let _ = tx.send(Err(ConnectionError::Canceled));
380 }
381 }
382
383 return Err(e);
384 }
385 Ok(c) => c,
386 };
387
388 let connection_id = c.id();
389 let proxy_url = if broker.proxy {
390 Some(broker.broker_url.clone())
391 } else {
392 None
393 };
394
395 let weak_conn = Arc::downgrade(&c);
397 let mut interval = self
398 .executor
399 .interval(self.connection_retry_options.keep_alive);
400 let broker_url = broker.url.clone();
401 let proxy_to_broker_url = proxy_url.clone();
402 let res = self.executor.spawn(Box::pin(async move {
403 use crate::futures::StreamExt;
404 while let Some(()) = interval.next().await {
405 if let Some(url) = proxy_to_broker_url.as_ref() {
406 trace!(
407 "will ping connection {} to {} via proxy {}",
408 connection_id,
409 url,
410 broker_url
411 );
412 } else {
413 trace!("will ping connection {} to {}", connection_id, broker_url);
414 }
415 if let Some(strong_conn) = weak_conn.upgrade() {
416 if !strong_conn.is_valid() {
417 trace!("connection {} is not valid anymore, skip heart beat task",
418 connection_id);
419 break;
420 }
421 if let Err(e) = strong_conn.sender().send_ping().await {
422 error!(
423 "could not ping connection {} to the server at {}: {}",
424 connection_id, broker_url, e
425 );
426 }
427 } else {
428 trace!("strong connection was dropped, stopping keepalive task");
431 break;
432 }
433 }
434 }));
435 if res.is_err() {
436 error!("the executor could not spawn the heartbeat future");
437 return Err(ConnectionError::Shutdown);
438 }
439
440 let old = self
441 .connections
442 .lock()
443 .await
444 .insert(broker, ConnectionStatus::Connected(c.clone()));
445 match old {
446 Some(ConnectionStatus::Connecting(mut v)) => {
447 for tx in v.drain(..) {
449 let _ = tx.send(Ok(c.clone()));
450 }
451 }
452 Some(ConnectionStatus::Connected(_)) => {
453 }
455 None => {
456 }
458 };
459
460 Ok(c)
461 }
462
463 pub(crate) async fn check_connections(&self) {
465 trace!("cleaning invalid or unused connections");
466 self.connections
467 .lock()
468 .await
469 .retain(|_, ref mut connection| match connection {
470 ConnectionStatus::Connecting(_) => true,
471 ConnectionStatus::Connected(conn) => {
472 conn.is_valid() && Arc::strong_count(conn) > 1
479 }
480 });
481 }
482}