pulsar/
connection_manager.rs

1use crate::connection::{Connection};
2use crate::error::ConnectionError;
3use crate::executor::Executor;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::Duration;
7
8use futures::{channel::oneshot, lock::Mutex};
9use native_tls::Certificate;
10use rand::Rng;
11use url::Url;
12
13/// holds connection information for a broker
14#[derive(Debug, Clone, Hash, PartialEq, Eq)]
15pub struct BrokerAddress {
16    /// URL we're using for connection (can be the proxy's URL)
17    pub url: Url,
18    /// pulsar URL for the broker we're actually contacting
19    /// this must follow the IP:port format
20    pub broker_url: String,
21    /// true if we're connecting through a proxy
22    pub proxy: bool,
23}
24
25/// configuration for reconnection exponential back off
26#[derive(Debug, Clone)]
27pub struct ConnectionRetryOptions {
28    /// minimum delay between connection retries
29    pub min_backoff: Duration,
30    /// maximum delay between rconnection etries
31    pub max_backoff: Duration,
32    /// maximum number of connection retries
33    pub max_retries: u32,
34    /// time limit to establish a connection
35    pub connection_timeout: Duration,
36    /// keep-alive interval for each broker connection
37    pub keep_alive: Duration,
38}
39
40impl std::default::Default for ConnectionRetryOptions {
41    fn default() -> Self {
42        ConnectionRetryOptions {
43            min_backoff: Duration::from_millis(10),
44            max_backoff: Duration::from_secs(30),
45            max_retries: 12u32,
46            connection_timeout: Duration::from_secs(10),
47            keep_alive: Duration::from_secs(60),
48        }
49    }
50}
51
52/// configuration for Pulsar operation retries
53#[derive(Debug, Clone)]
54pub struct OperationRetryOptions {
55    /// time limit to receive an answer to a Pulsar operation
56    pub operation_timeout: Duration,
57    /// delay between operation retries after a ServiceNotReady error
58    pub retry_delay: Duration,
59    /// maximum number of operation retries. None indicates infinite retries
60    pub max_retries: Option<u32>,
61}
62
63impl std::default::Default for OperationRetryOptions {
64    fn default() -> Self {
65        OperationRetryOptions {
66            operation_timeout: Duration::from_secs(30),
67            retry_delay: Duration::from_millis(500),
68            max_retries: None,
69        }
70    }
71}
72
73/// configuration for TLS connections
74#[derive(Debug, Clone)]
75pub struct TlsOptions {
76    /// contains a list of PEM encoded certificates
77    pub certificate_chain: Option<Vec<u8>>,
78
79    /// allow insecure TLS connection if set to true
80    ///
81    /// defaults to *false*
82    pub allow_insecure_connection: bool,
83
84    /// whether hostname verification is enabled when insecure TLS connection is allowed
85    ///
86    /// defaults to *true*
87    pub tls_hostname_verification_enabled: bool,
88}
89
90impl Default for TlsOptions {
91    fn default() -> Self {
92        Self {
93            certificate_chain: None,
94            allow_insecure_connection: false,
95            tls_hostname_verification_enabled: true,
96        }
97    }
98}
99
100enum ConnectionStatus<Exe: Executor> {
101    Connected(Arc<Connection<Exe>>),
102    Connecting(Vec<oneshot::Sender<Result<Arc<Connection<Exe>>, ConnectionError>>>),
103}
104
105/// Look up broker addresses for topics and partitioned topics
106///
107/// The ConnectionManager object provides a single interface to start
108/// interacting with a cluster. It will automatically follow redirects
109/// or use a proxy, and aggregate broker connections
110#[derive(Clone)]
111pub struct ConnectionManager<Exe: Executor> {
112    pub url: Url,
113    auth: Option<Arc<Mutex<Box<dyn crate::authentication::Authentication>>>>,
114    pub(crate) executor: Arc<Exe>,
115    connections: Arc<Mutex<HashMap<BrokerAddress, ConnectionStatus<Exe>>>>,
116    connection_retry_options: ConnectionRetryOptions,
117    pub(crate) operation_retry_options: OperationRetryOptions,
118    tls_options: TlsOptions,
119    certificate_chain: Vec<native_tls::Certificate>,
120}
121
122impl<Exe: Executor> ConnectionManager<Exe> {
123    pub async fn new(
124        url: String,
125        auth: Option<Arc<Mutex<Box<dyn crate::authentication::Authentication>>>>,
126        connection_retry: Option<ConnectionRetryOptions>,
127        operation_retry_options: OperationRetryOptions,
128        tls: Option<TlsOptions>,
129        executor: Arc<Exe>,
130    ) -> Result<Self, ConnectionError> {
131        let connection_retry_options = connection_retry.unwrap_or_default();
132        let tls_options = tls.unwrap_or_default();
133        let url = Url::parse(&url)
134            .map_err(|e| {
135                error!("error parsing URL: {:?}", e);
136                ConnectionError::NotFound
137            })
138            .and_then(|url| {
139                url.host_str().ok_or_else(|| {
140                    error!("missing host for URL: {:?}", url);
141                    ConnectionError::NotFound
142                })?;
143                Ok(url)
144            })?;
145
146        let certificate_chain = match tls_options.certificate_chain.as_ref() {
147            None => vec![],
148            Some(certificate_chain) => {
149                let mut v = vec![];
150                for cert in pem::parse_many(&certificate_chain).iter().rev() {
151                    v.push(
152                        Certificate::from_der(&cert.contents[..])
153                            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?,
154                    );
155                }
156                v
157            }
158        };
159
160        if let Some(auth) = auth.clone() {
161            auth.lock().await.initialize().await?;
162        }
163
164        let manager = ConnectionManager {
165            url: url.clone(),
166            auth,
167            executor,
168            connections: Arc::new(Mutex::new(HashMap::new())),
169            connection_retry_options,
170            operation_retry_options,
171            tls_options,
172            certificate_chain,
173        };
174        let broker_address = BrokerAddress {
175            url: url.clone(),
176            broker_url: format!("{}:{}", url.host_str().unwrap(), url.port().unwrap_or(6650)),
177            proxy: false,
178        };
179        manager.connect(broker_address).await?;
180        Ok(manager)
181    }
182
183    pub fn get_base_address(&self) -> BrokerAddress {
184        BrokerAddress {
185            url: self.url.clone(),
186            broker_url: format!(
187                "{}:{}",
188                self.url.host_str().unwrap(),
189                self.url.port().unwrap_or(6650)
190            ),
191            proxy: false,
192        }
193    }
194
195    /// get an active Connection from a broker address
196    ///
197    /// creates a connection if not available
198    pub async fn get_base_connection(&self) -> Result<Arc<Connection<Exe>>, ConnectionError> {
199        let broker_address = BrokerAddress {
200            url: self.url.clone(),
201            broker_url: format!(
202                "{}:{}",
203                self.url.host_str().unwrap(),
204                self.url.port().unwrap_or(6650)
205            ),
206            proxy: false,
207        };
208
209        self.get_connection(&broker_address).await
210    }
211
212    /// get an active Connection from a broker address
213    ///
214    /// creates a connection if not available
215    pub async fn get_connection(
216        &self,
217        broker: &BrokerAddress,
218    ) -> Result<Arc<Connection<Exe>>, ConnectionError> {
219        let rx = {
220            let mut conns = self.connections.lock().await;
221            match conns.get_mut(broker) {
222                None => None,
223                Some(ConnectionStatus::Connected(conn)) => {
224                    if conn.is_valid() {
225                        return Ok(conn.clone());
226                    } else {
227                        None
228                    }
229                }
230                Some(ConnectionStatus::Connecting(ref mut v)) => {
231                    let (tx, rx) = oneshot::channel();
232                    v.push(tx);
233                    Some(rx)
234                }
235            }
236        };
237
238        match rx {
239            None => self.connect(broker.clone()).await,
240            Some(rx) => match rx.await {
241                Ok(res) => res,
242                Err(_) => Err(ConnectionError::Canceled),
243            },
244        }
245    }
246
247    async fn connect_inner(
248        &self,
249        broker: &BrokerAddress,
250    ) -> Result<Arc<Connection<Exe>>, ConnectionError> {
251        debug!("ConnectionManager::connect({:?})", broker);
252
253        let rx = {
254            match self
255                .connections
256                .lock()
257                .await
258                .entry(broker.clone())
259                .or_insert_with(|| ConnectionStatus::Connecting(Vec::new()))
260            {
261                ConnectionStatus::Connecting(ref mut v) => {
262                    if v.is_empty() {
263                        None
264                    } else {
265                        let (tx, rx) = oneshot::channel();
266                        v.push(tx);
267                        Some(rx)
268                    }
269                }
270                ConnectionStatus::Connected(_) => None,
271            }
272        };
273        if let Some(rx) = rx {
274            return match rx.await {
275                Ok(res) => res,
276                Err(_) => Err(ConnectionError::Canceled),
277            };
278        }
279
280        let proxy_url = if broker.proxy {
281            Some(broker.broker_url.clone())
282        } else {
283            None
284        };
285
286        let mut current_backoff;
287        let mut current_retries = 0u32;
288
289        let start = std::time::Instant::now();
290        let conn = loop {
291            match Connection::new(
292                broker.url.clone(),
293                self.auth.clone(),
294                proxy_url.clone(),
295                &self.certificate_chain,
296                self.tls_options.allow_insecure_connection,
297                self.tls_options.tls_hostname_verification_enabled,
298                self.connection_retry_options.connection_timeout,
299                self.operation_retry_options.operation_timeout,
300                self.executor.clone(),
301            )
302            .await
303            {
304                Ok(c) => break c,
305                Err(ConnectionError::Io(e)) => {
306                    if e.kind() != std::io::ErrorKind::ConnectionRefused
307                        || e.kind() != std::io::ErrorKind::TimedOut
308                    {
309                        return Err(ConnectionError::Io(e));
310                    }
311
312                    if current_retries == self.connection_retry_options.max_retries {
313                        return Err(ConnectionError::Io(e));
314                    }
315
316                    let jitter = rand::thread_rng().gen_range(0..10);
317                    current_backoff = std::cmp::min(
318                        self.connection_retry_options.min_backoff
319                            * 2u32.saturating_pow(current_retries),
320                        self.connection_retry_options.max_backoff,
321                    ) + self.connection_retry_options.min_backoff * jitter;
322                    current_retries += 1;
323
324                    trace!(
325                        "current retries: {}, current_backoff(pow = {}): {}ms",
326                        current_retries,
327                        2u32.pow(current_retries - 1),
328                        current_backoff.as_millis()
329                    );
330                    error!(
331                        "connection error, retrying connection to {} after {}ms",
332                        broker.url,
333                        current_backoff.as_millis()
334                    );
335                    self.executor.delay(current_backoff).await;
336                }
337                Err(e) => return Err(e),
338            }
339        };
340        let connection_id = conn.id();
341        if let Some(url) = proxy_url.as_ref() {
342            info!(
343                "Connected n°{} to {} via proxy {} in {}ms",
344                connection_id,
345                url,
346                broker.url,
347                (std::time::Instant::now() - start).as_millis()
348            );
349        } else {
350            info!(
351                "Connected n°{} to {} in {}ms",
352                connection_id,
353                broker.url,
354                (std::time::Instant::now() - start).as_millis()
355            );
356        }
357        let c = Arc::new(conn);
358
359        Ok(c)
360    }
361
362    async fn connect(
363        &self,
364        broker: BrokerAddress,
365    ) -> Result<Arc<Connection<Exe>>, ConnectionError> {
366        let c = match self.connect_inner(&broker).await {
367            Err(e) => {
368                // the current ConnectionStatus is Connecting, containing
369                // notification channels for all the tasks waiting for the
370                // reconnection. If we delete this status, they will be
371                // notified that reconnection is canceled instead of getting
372                // stuck
373                if let Some(ConnectionStatus::Connecting(mut v)) =
374                    self.connections.lock().await.remove(&broker)
375                {
376                    for tx in v.drain(..) {
377                        // we cannot clone ConnectionError so we tell other
378                        // tasks that reconnection is canceled
379                        let _ = tx.send(Err(ConnectionError::Canceled));
380                    }
381                }
382
383                return Err(e);
384            }
385            Ok(c) => c,
386        };
387
388        let connection_id = c.id();
389        let proxy_url = if broker.proxy {
390            Some(broker.broker_url.clone())
391        } else {
392            None
393        };
394
395        // set up client heartbeats for the connection
396        let weak_conn = Arc::downgrade(&c);
397        let mut interval = self
398            .executor
399            .interval(self.connection_retry_options.keep_alive);
400        let broker_url = broker.url.clone();
401        let proxy_to_broker_url = proxy_url.clone();
402        let res = self.executor.spawn(Box::pin(async move {
403            use crate::futures::StreamExt;
404            while let Some(()) = interval.next().await {
405                if let Some(url) = proxy_to_broker_url.as_ref() {
406                    trace!(
407                        "will ping connection {} to {} via proxy {}",
408                        connection_id,
409                        url,
410                        broker_url
411                    );
412                } else {
413                    trace!("will ping connection {} to {}", connection_id, broker_url);
414                }
415                if let Some(strong_conn) = weak_conn.upgrade() {
416                    if !strong_conn.is_valid() {
417                        trace!("connection {} is not valid anymore, skip heart beat task",
418                            connection_id);
419                        break;
420                    }
421                    if let Err(e) = strong_conn.sender().send_ping().await {
422                        error!(
423                            "could not ping connection {} to the server at {}: {}",
424                            connection_id, broker_url, e
425                        );
426                    }
427                } else {
428                    // if the strong pointers were dropped, we can stop the heartbeat for this
429                    // connection
430                    trace!("strong connection was dropped, stopping keepalive task");
431                    break;
432                }
433            }
434        }));
435        if res.is_err() {
436            error!("the executor could not spawn the heartbeat future");
437            return Err(ConnectionError::Shutdown);
438        }
439
440        let old = self
441            .connections
442            .lock()
443            .await
444            .insert(broker, ConnectionStatus::Connected(c.clone()));
445        match old {
446            Some(ConnectionStatus::Connecting(mut v)) => {
447                //info!("was in connecting state({} waiting)", v.len());
448                for tx in v.drain(..) {
449                    let _ = tx.send(Ok(c.clone()));
450                }
451            }
452            Some(ConnectionStatus::Connected(_)) => {
453                //info!("removing old connection");
454            }
455            None => {
456                //info!("setting up new connection");
457            }
458        };
459
460        Ok(c)
461    }
462
463    /// tests that all connections are valid and still used
464    pub(crate) async fn check_connections(&self) {
465        trace!("cleaning invalid or unused connections");
466        self.connections
467            .lock()
468            .await
469            .retain(|_, ref mut connection| match connection {
470                ConnectionStatus::Connecting(_) => true,
471                ConnectionStatus::Connected(conn) => {
472                    // if the manager holds the only reference to that
473                    // connection, we can remove it from the manager
474                    // no need for special synchronization here: we're already
475                    // in a mutex, and a case appears where the Arc is cloned
476                    // somewhere at the same time, that just means the manager
477                    // will create a new connection the next time it is asked
478                    conn.is_valid() && Arc::strong_count(conn) > 1
479                }
480            });
481    }
482}