redis/
client.rs

1use std::time::Duration;
2
3#[cfg(feature = "aio")]
4use crate::aio::{AsyncPushSender, DefaultAsyncDNSResolver};
5#[cfg(feature = "aio")]
6use crate::io::{tcp::TcpSettings, AsyncDNSResolver};
7use crate::{
8    connection::{connect, Connection, ConnectionInfo, ConnectionLike, IntoConnectionInfo},
9    types::{RedisResult, Value},
10};
11#[cfg(feature = "aio")]
12use std::pin::Pin;
13
14#[cfg(feature = "tls-rustls")]
15use crate::tls::{inner_build_with_tls, TlsCertificates};
16
17#[cfg(feature = "cache-aio")]
18use crate::caching::CacheConfig;
19#[cfg(all(
20    feature = "cache-aio",
21    any(feature = "connection-manager", feature = "cluster-async")
22))]
23use crate::caching::CacheManager;
24
25/// The client type.
26#[derive(Debug, Clone)]
27pub struct Client {
28    pub(crate) connection_info: ConnectionInfo,
29}
30
31/// The client acts as connector to the redis server.  By itself it does not
32/// do much other than providing a convenient way to fetch a connection from
33/// it.  In the future the plan is to provide a connection pool in the client.
34///
35/// When opening a client a URL in the following format should be used:
36///
37/// ```plain
38/// redis://host:port/db
39/// ```
40///
41/// Example usage::
42///
43/// ```rust,no_run
44/// let client = redis::Client::open("redis://127.0.0.1/").unwrap();
45/// let con = client.get_connection().unwrap();
46/// ```
47impl Client {
48    /// Connects to a redis server and returns a client.  This does not
49    /// actually open a connection yet but it does perform some basic
50    /// checks on the URL that might make the operation fail.
51    pub fn open<T: IntoConnectionInfo>(params: T) -> RedisResult<Client> {
52        Ok(Client {
53            connection_info: params.into_connection_info()?,
54        })
55    }
56
57    /// Instructs the client to actually connect to redis and returns a
58    /// connection object.  The connection object can be used to send
59    /// commands to the server.  This can fail with a variety of errors
60    /// (like unreachable host) so it's important that you handle those
61    /// errors.
62    pub fn get_connection(&self) -> RedisResult<Connection> {
63        connect(&self.connection_info, None)
64    }
65
66    /// Instructs the client to actually connect to redis with specified
67    /// timeout and returns a connection object.  The connection object
68    /// can be used to send commands to the server.  This can fail with
69    /// a variety of errors (like unreachable host) so it's important
70    /// that you handle those errors.
71    pub fn get_connection_with_timeout(&self, timeout: Duration) -> RedisResult<Connection> {
72        connect(&self.connection_info, Some(timeout))
73    }
74
75    /// Returns a reference of client connection info object.
76    pub fn get_connection_info(&self) -> &ConnectionInfo {
77        &self.connection_info
78    }
79
80    /// Constructs a new `Client` with parameters necessary to create a TLS connection.
81    ///
82    /// - `conn_info` - URL using the `rediss://` scheme.
83    /// - `tls_certs` - `TlsCertificates` structure containing:
84    ///     - `client_tls` - Optional `ClientTlsConfig` containing byte streams for
85    ///         - `client_cert` - client's byte stream containing client certificate in PEM format
86    ///         - `client_key` - client's byte stream containing private key in PEM format
87    ///     - `root_cert` - Optional byte stream yielding PEM formatted file for root certificates.
88    ///
89    /// If `ClientTlsConfig` ( cert+key pair ) is not provided, then client-side authentication is not enabled.
90    /// If `root_cert` is not provided, then system root certificates are used instead.
91    ///
92    /// # Examples
93    ///
94    /// ```no_run
95    /// use std::{fs::File, io::{BufReader, Read}};
96    ///
97    /// use redis::{Client, AsyncCommands as _, TlsCertificates, ClientTlsConfig};
98    ///
99    /// async fn do_redis_code(
100    ///     url: &str,
101    ///     root_cert_file: &str,
102    ///     cert_file: &str,
103    ///     key_file: &str
104    /// ) -> redis::RedisResult<()> {
105    ///     let root_cert_file = File::open(root_cert_file).expect("cannot open private cert file");
106    ///     let mut root_cert_vec = Vec::new();
107    ///     BufReader::new(root_cert_file)
108    ///         .read_to_end(&mut root_cert_vec)
109    ///         .expect("Unable to read ROOT cert file");
110    ///
111    ///     let cert_file = File::open(cert_file).expect("cannot open private cert file");
112    ///     let mut client_cert_vec = Vec::new();
113    ///     BufReader::new(cert_file)
114    ///         .read_to_end(&mut client_cert_vec)
115    ///         .expect("Unable to read client cert file");
116    ///
117    ///     let key_file = File::open(key_file).expect("cannot open private key file");
118    ///     let mut client_key_vec = Vec::new();
119    ///     BufReader::new(key_file)
120    ///         .read_to_end(&mut client_key_vec)
121    ///         .expect("Unable to read client key file");
122    ///
123    ///     let client = Client::build_with_tls(
124    ///         url,
125    ///         TlsCertificates {
126    ///             client_tls: Some(ClientTlsConfig{
127    ///                 client_cert: client_cert_vec,
128    ///                 client_key: client_key_vec,
129    ///             }),
130    ///             root_cert: Some(root_cert_vec),
131    ///         }
132    ///     )
133    ///     .expect("Unable to build client");
134    ///
135    ///     let connection_info = client.get_connection_info();
136    ///
137    ///     println!(">>> connection info: {connection_info:?}");
138    ///
139    ///     let mut con = client.get_multiplexed_async_connection().await?;
140    ///
141    ///     con.set("key1", b"foo").await?;
142    ///
143    ///     redis::cmd("SET")
144    ///         .arg(&["key2", "bar"])
145    ///         .exec_async(&mut con)
146    ///         .await?;
147    ///
148    ///     let result = redis::cmd("MGET")
149    ///         .arg(&["key1", "key2"])
150    ///         .query_async(&mut con)
151    ///         .await;
152    ///     assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec())));
153    ///     println!("Result from MGET: {result:?}");
154    ///
155    ///     Ok(())
156    /// }
157    /// ```
158    #[cfg(feature = "tls-rustls")]
159    pub fn build_with_tls<C: IntoConnectionInfo>(
160        conn_info: C,
161        tls_certs: TlsCertificates,
162    ) -> RedisResult<Client> {
163        let connection_info = conn_info.into_connection_info()?;
164
165        inner_build_with_tls(connection_info, &tls_certs)
166    }
167}
168
169#[cfg(feature = "cache-aio")]
170#[derive(Clone)]
171pub(crate) enum Cache {
172    Config(CacheConfig),
173    #[cfg(any(feature = "connection-manager", feature = "cluster-async"))]
174    Manager(CacheManager),
175}
176
177/// Options for creation of async connection
178#[cfg(feature = "aio")]
179#[derive(Clone, Default)]
180pub struct AsyncConnectionConfig {
181    /// Maximum time to wait for a response from the server
182    pub(crate) response_timeout: Option<std::time::Duration>,
183    /// Maximum time to wait for a connection to be established
184    pub(crate) connection_timeout: Option<std::time::Duration>,
185    pub(crate) push_sender: Option<std::sync::Arc<dyn AsyncPushSender>>,
186    #[cfg(feature = "cache-aio")]
187    pub(crate) cache: Option<Cache>,
188    pub(crate) tcp_settings: TcpSettings,
189    pub(crate) dns_resolver: Option<std::sync::Arc<dyn AsyncDNSResolver>>,
190}
191
192#[cfg(feature = "aio")]
193impl AsyncConnectionConfig {
194    /// Creates a new instance of the options with nothing set
195    pub fn new() -> Self {
196        Self::default()
197    }
198
199    /// Sets the connection timeout
200    pub fn set_connection_timeout(mut self, connection_timeout: std::time::Duration) -> Self {
201        self.connection_timeout = Some(connection_timeout);
202        self
203    }
204
205    /// Sets the response timeout
206    pub fn set_response_timeout(mut self, response_timeout: std::time::Duration) -> Self {
207        self.response_timeout = Some(response_timeout);
208        self
209    }
210
211    /// Sets sender sender for push values.
212    ///
213    /// The sender can be a channel, or an arbitrary function that handles [crate::PushInfo] values.
214    /// This will fail client creation if the connection isn't configured for RESP3 communications via the [crate::RedisConnectionInfo::protocol] field.
215    ///
216    /// # Examples
217    ///
218    /// ```rust
219    /// # use redis::AsyncConnectionConfig;
220    /// let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
221    /// let config = AsyncConnectionConfig::new().set_push_sender(tx);
222    /// ```
223    ///
224    /// ```rust
225    /// # use std::sync::{Mutex, Arc};
226    /// # use redis::AsyncConnectionConfig;
227    /// let messages = Arc::new(Mutex::new(Vec::new()));
228    /// let config = AsyncConnectionConfig::new().set_push_sender(move |msg|{
229    ///     let Ok(mut messages) = messages.lock() else {
230    ///         return Err(redis::aio::SendError);
231    ///     };
232    ///     messages.push(msg);
233    ///     Ok(())
234    /// });
235    /// ```
236    pub fn set_push_sender(self, sender: impl AsyncPushSender) -> Self {
237        self.set_push_sender_internal(std::sync::Arc::new(sender))
238    }
239
240    pub(crate) fn set_push_sender_internal(
241        mut self,
242        sender: std::sync::Arc<dyn AsyncPushSender>,
243    ) -> Self {
244        self.push_sender = Some(sender);
245        self
246    }
247
248    /// Sets cache config for MultiplexedConnection, check CacheConfig for more details.
249    #[cfg(feature = "cache-aio")]
250    pub fn set_cache_config(mut self, cache_config: CacheConfig) -> Self {
251        self.cache = Some(Cache::Config(cache_config));
252        self
253    }
254
255    #[cfg(all(
256        feature = "cache-aio",
257        any(feature = "connection-manager", feature = "cluster-async")
258    ))]
259    pub(crate) fn set_cache_manager(mut self, cache_manager: CacheManager) -> Self {
260        self.cache = Some(Cache::Manager(cache_manager));
261        self
262    }
263
264    /// Set the behavior of the underlying TCP connection.
265    pub fn set_tcp_settings(self, tcp_settings: crate::io::tcp::TcpSettings) -> Self {
266        Self {
267            tcp_settings,
268            ..self
269        }
270    }
271
272    /// Set the DNS resolver for the underlying TCP connection.
273    ///
274    /// The parameter resolver must implement the [`crate::io::AsyncDNSResolver`] trait.
275    pub fn set_dns_resolver(self, dns_resolver: impl AsyncDNSResolver) -> Self {
276        self.set_dns_resolver_internal(std::sync::Arc::new(dns_resolver))
277    }
278
279    pub(super) fn set_dns_resolver_internal(
280        mut self,
281        dns_resolver: std::sync::Arc<dyn AsyncDNSResolver>,
282    ) -> Self {
283        self.dns_resolver = Some(dns_resolver);
284        self
285    }
286}
287
288/// To enable async support you need to chose one of the supported runtimes and active its
289/// corresponding feature: `tokio-comp` or `async-std-comp`
290#[cfg(feature = "aio")]
291#[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
292impl Client {
293    /// Returns an async connection from the client.
294    #[cfg(feature = "aio")]
295    #[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
296    pub async fn get_multiplexed_async_connection(
297        &self,
298    ) -> RedisResult<crate::aio::MultiplexedConnection> {
299        self.get_multiplexed_async_connection_with_config(&AsyncConnectionConfig::new())
300            .await
301    }
302
303    /// Returns an async connection from the client.
304    #[cfg(feature = "aio")]
305    #[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
306    #[deprecated(note = "Use `get_multiplexed_async_connection_with_config` instead")]
307    pub async fn get_multiplexed_async_connection_with_timeouts(
308        &self,
309        response_timeout: std::time::Duration,
310        connection_timeout: std::time::Duration,
311    ) -> RedisResult<crate::aio::MultiplexedConnection> {
312        self.get_multiplexed_async_connection_with_config(
313            &AsyncConnectionConfig::new()
314                .set_connection_timeout(connection_timeout)
315                .set_response_timeout(response_timeout),
316        )
317        .await
318    }
319
320    /// Returns an async connection from the client.
321    #[cfg(feature = "aio")]
322    #[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
323    pub async fn get_multiplexed_async_connection_with_config(
324        &self,
325        config: &AsyncConnectionConfig,
326    ) -> RedisResult<crate::aio::MultiplexedConnection> {
327        match Runtime::locate() {
328            #[cfg(feature = "tokio-comp")]
329            rt @ Runtime::Tokio => self
330                .get_multiplexed_async_connection_inner_with_timeout::<crate::aio::tokio::Tokio>(
331                    config, rt,
332                )
333                .await,
334
335            #[cfg(feature = "async-std-comp")]
336            rt @ Runtime::AsyncStd => self.get_multiplexed_async_connection_inner_with_timeout::<
337                crate::aio::async_std::AsyncStd,
338            >(config, rt)
339            .await,
340
341            #[cfg(feature = "smol-comp")]
342            rt @ Runtime::Smol => self.get_multiplexed_async_connection_inner_with_timeout::<
343                crate::aio::smol::Smol,
344            >(config, rt)
345            .await,
346        }
347    }
348
349    /// Returns an async multiplexed connection from the client.
350    ///
351    /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
352    /// on the same underlying connection (tcp/unix socket).
353    #[cfg(feature = "tokio-comp")]
354    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
355    pub async fn get_multiplexed_tokio_connection_with_response_timeouts(
356        &self,
357        response_timeout: std::time::Duration,
358        connection_timeout: std::time::Duration,
359    ) -> RedisResult<crate::aio::MultiplexedConnection> {
360        let result = Runtime::locate()
361            .timeout(
362                connection_timeout,
363                self.get_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
364                    &AsyncConnectionConfig::new().set_response_timeout(response_timeout),
365                ),
366            )
367            .await;
368
369        match result {
370            Ok(Ok(connection)) => Ok(connection),
371            Ok(Err(e)) => Err(e),
372            Err(elapsed) => Err(elapsed.into()),
373        }
374    }
375
376    /// Returns an async multiplexed connection from the client.
377    ///
378    /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
379    /// on the same underlying connection (tcp/unix socket).
380    #[cfg(feature = "tokio-comp")]
381    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
382    pub async fn get_multiplexed_tokio_connection(
383        &self,
384    ) -> RedisResult<crate::aio::MultiplexedConnection> {
385        self.get_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
386            &AsyncConnectionConfig::new(),
387        )
388        .await
389    }
390
391    /// Returns an async multiplexed connection from the client.
392    ///
393    /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
394    /// on the same underlying connection (tcp/unix socket).
395    #[cfg(feature = "async-std-comp")]
396    #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
397    pub async fn get_multiplexed_async_std_connection_with_timeouts(
398        &self,
399        response_timeout: std::time::Duration,
400        connection_timeout: std::time::Duration,
401    ) -> RedisResult<crate::aio::MultiplexedConnection> {
402        let result = Runtime::locate()
403            .timeout(
404                connection_timeout,
405                self.get_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
406                    &AsyncConnectionConfig::new().set_response_timeout(response_timeout),
407                ),
408            )
409            .await;
410
411        match result {
412            Ok(Ok(connection)) => Ok(connection),
413            Ok(Err(e)) => Err(e),
414            Err(elapsed) => Err(elapsed.into()),
415        }
416    }
417
418    /// Returns an async multiplexed connection from the client.
419    ///
420    /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
421    /// on the same underlying connection (tcp/unix socket).
422    #[cfg(feature = "async-std-comp")]
423    #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
424    pub async fn get_multiplexed_async_std_connection(
425        &self,
426    ) -> RedisResult<crate::aio::MultiplexedConnection> {
427        self.get_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
428            &AsyncConnectionConfig::new(),
429        )
430        .await
431    }
432
433    /// Returns an async multiplexed connection from the client and a future which must be polled
434    /// to drive any requests submitted to it (see [Self::get_multiplexed_async_connection]).
435    ///
436    /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
437    /// on the same underlying connection (tcp/unix socket).
438    /// The multiplexer will return a timeout error on any request that takes longer then `response_timeout`.
439    #[cfg(feature = "tokio-comp")]
440    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
441    pub async fn create_multiplexed_tokio_connection_with_response_timeout(
442        &self,
443        response_timeout: std::time::Duration,
444    ) -> RedisResult<(
445        crate::aio::MultiplexedConnection,
446        impl std::future::Future<Output = ()>,
447    )> {
448        self.create_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
449            &AsyncConnectionConfig::new().set_response_timeout(response_timeout),
450        )
451        .await
452    }
453
454    /// Returns an async multiplexed connection from the client and a future which must be polled
455    /// to drive any requests submitted to it (see [Self::get_multiplexed_async_connection]).
456    ///
457    /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
458    /// on the same underlying connection (tcp/unix socket).
459    #[cfg(feature = "tokio-comp")]
460    #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
461    pub async fn create_multiplexed_tokio_connection(
462        &self,
463    ) -> RedisResult<(
464        crate::aio::MultiplexedConnection,
465        impl std::future::Future<Output = ()>,
466    )> {
467        self.create_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
468            &AsyncConnectionConfig::new(),
469        )
470        .await
471    }
472
473    /// Returns an async multiplexed connection from the client and a future which must be polled
474    /// to drive any requests submitted to it (see [Self::get_multiplexed_async_connection]).
475    ///
476    /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
477    /// on the same underlying connection (tcp/unix socket).
478    /// The multiplexer will return a timeout error on any request that takes longer then `response_timeout`.
479    #[cfg(feature = "async-std-comp")]
480    #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
481    pub async fn create_multiplexed_async_std_connection_with_response_timeout(
482        &self,
483        response_timeout: std::time::Duration,
484    ) -> RedisResult<(
485        crate::aio::MultiplexedConnection,
486        impl std::future::Future<Output = ()>,
487    )> {
488        self.create_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
489            &AsyncConnectionConfig::new().set_response_timeout(response_timeout),
490        )
491        .await
492    }
493
494    /// Returns an async multiplexed connection from the client and a future which must be polled
495    /// to drive any requests submitted to it (see [Self::get_multiplexed_async_connection]).
496    ///
497    /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
498    /// on the same underlying connection (tcp/unix socket).
499    #[cfg(feature = "async-std-comp")]
500    #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
501    pub async fn create_multiplexed_async_std_connection(
502        &self,
503    ) -> RedisResult<(
504        crate::aio::MultiplexedConnection,
505        impl std::future::Future<Output = ()>,
506    )> {
507        self.create_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
508            &AsyncConnectionConfig::new(),
509        )
510        .await
511    }
512
513    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
514    ///
515    /// The connection manager wraps a
516    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
517    /// connection fails with a connection error, then a new connection is
518    /// established in the background and the error is returned to the caller.
519    ///
520    /// This means that on connection loss at least one command will fail, but
521    /// the connection will be re-established automatically if possible. Please
522    /// refer to the [`ConnectionManager`][connection-manager] docs for
523    /// detailed reconnecting behavior.
524    ///
525    /// A connection manager can be cloned, allowing requests to be sent concurrently
526    /// on the same underlying connection (tcp/unix socket).
527    ///
528    /// [connection-manager]: aio/struct.ConnectionManager.html
529    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
530    #[cfg(feature = "connection-manager")]
531    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
532    #[deprecated(note = "use get_connection_manager instead")]
533    pub async fn get_tokio_connection_manager(&self) -> RedisResult<crate::aio::ConnectionManager> {
534        crate::aio::ConnectionManager::new(self.clone()).await
535    }
536
537    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
538    ///
539    /// The connection manager wraps a
540    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
541    /// connection fails with a connection error, then a new connection is
542    /// established in the background and the error is returned to the caller.
543    ///
544    /// This means that on connection loss at least one command will fail, but
545    /// the connection will be re-established automatically if possible. Please
546    /// refer to the [`ConnectionManager`][connection-manager] docs for
547    /// detailed reconnecting behavior.
548    ///
549    /// A connection manager can be cloned, allowing requests to be sent concurrently
550    /// on the same underlying connection (tcp/unix socket).
551    ///
552    /// [connection-manager]: aio/struct.ConnectionManager.html
553    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
554    #[cfg(feature = "connection-manager")]
555    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
556    pub async fn get_connection_manager(&self) -> RedisResult<crate::aio::ConnectionManager> {
557        crate::aio::ConnectionManager::new(self.clone()).await
558    }
559
560    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
561    ///
562    /// The connection manager wraps a
563    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
564    /// connection fails with a connection error, then a new connection is
565    /// established in the background and the error is returned to the caller.
566    ///
567    /// This means that on connection loss at least one command will fail, but
568    /// the connection will be re-established automatically if possible. Please
569    /// refer to the [`ConnectionManager`][connection-manager] docs for
570    /// detailed reconnecting behavior.
571    ///
572    /// A connection manager can be cloned, allowing requests to be sent concurrently
573    /// on the same underlying connection (tcp/unix socket).
574    ///
575    /// [connection-manager]: aio/struct.ConnectionManager.html
576    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
577    #[cfg(feature = "connection-manager")]
578    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
579    #[deprecated(note = "Use `get_connection_manager_with_config` instead")]
580    pub async fn get_tokio_connection_manager_with_backoff(
581        &self,
582        exponent_base: u64,
583        factor: u64,
584        number_of_retries: usize,
585    ) -> RedisResult<crate::aio::ConnectionManager> {
586        use crate::aio::ConnectionManagerConfig;
587
588        let config = ConnectionManagerConfig::new()
589            .set_exponent_base(exponent_base)
590            .set_factor(factor)
591            .set_number_of_retries(number_of_retries);
592        crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
593    }
594
595    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
596    ///
597    /// The connection manager wraps a
598    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
599    /// connection fails with a connection error, then a new connection is
600    /// established in the background and the error is returned to the caller.
601    ///
602    /// This means that on connection loss at least one command will fail, but
603    /// the connection will be re-established automatically if possible. Please
604    /// refer to the [`ConnectionManager`][connection-manager] docs for
605    /// detailed reconnecting behavior.
606    ///
607    /// A connection manager can be cloned, allowing requests to be sent concurrently
608    /// on the same underlying connection (tcp/unix socket).
609    ///
610    /// [connection-manager]: aio/struct.ConnectionManager.html
611    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
612    #[cfg(feature = "connection-manager")]
613    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
614    #[deprecated(note = "Use `get_connection_manager_with_config` instead")]
615    pub async fn get_tokio_connection_manager_with_backoff_and_timeouts(
616        &self,
617        exponent_base: u64,
618        factor: u64,
619        number_of_retries: usize,
620        response_timeout: std::time::Duration,
621        connection_timeout: std::time::Duration,
622    ) -> RedisResult<crate::aio::ConnectionManager> {
623        use crate::aio::ConnectionManagerConfig;
624
625        let config = ConnectionManagerConfig::new()
626            .set_exponent_base(exponent_base)
627            .set_factor(factor)
628            .set_response_timeout(response_timeout)
629            .set_connection_timeout(connection_timeout)
630            .set_number_of_retries(number_of_retries);
631        crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
632    }
633
634    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
635    ///
636    /// The connection manager wraps a
637    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
638    /// connection fails with a connection error, then a new connection is
639    /// established in the background and the error is returned to the caller.
640    ///
641    /// This means that on connection loss at least one command will fail, but
642    /// the connection will be re-established automatically if possible. Please
643    /// refer to the [`ConnectionManager`][connection-manager] docs for
644    /// detailed reconnecting behavior.
645    ///
646    /// A connection manager can be cloned, allowing requests to be sent concurrently
647    /// on the same underlying connection (tcp/unix socket).
648    ///
649    /// [connection-manager]: aio/struct.ConnectionManager.html
650    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
651    #[cfg(feature = "connection-manager")]
652    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
653    #[deprecated(note = "Use `get_connection_manager_with_config` instead")]
654    pub async fn get_connection_manager_with_backoff_and_timeouts(
655        &self,
656        exponent_base: u64,
657        factor: u64,
658        number_of_retries: usize,
659        response_timeout: std::time::Duration,
660        connection_timeout: std::time::Duration,
661    ) -> RedisResult<crate::aio::ConnectionManager> {
662        use crate::aio::ConnectionManagerConfig;
663
664        let config = ConnectionManagerConfig::new()
665            .set_exponent_base(exponent_base)
666            .set_factor(factor)
667            .set_response_timeout(response_timeout)
668            .set_connection_timeout(connection_timeout)
669            .set_number_of_retries(number_of_retries);
670        crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
671    }
672
673    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
674    ///
675    /// The connection manager wraps a
676    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
677    /// connection fails with a connection error, then a new connection is
678    /// established in the background and the error is returned to the caller.
679    ///
680    /// This means that on connection loss at least one command will fail, but
681    /// the connection will be re-established automatically if possible. Please
682    /// refer to the [`ConnectionManager`][connection-manager] docs for
683    /// detailed reconnecting behavior.
684    ///
685    /// A connection manager can be cloned, allowing requests to be sent concurrently
686    /// on the same underlying connection (tcp/unix socket).
687    ///
688    /// [connection-manager]: aio/struct.ConnectionManager.html
689    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
690    #[cfg(feature = "connection-manager")]
691    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
692    pub async fn get_connection_manager_with_config(
693        &self,
694        config: crate::aio::ConnectionManagerConfig,
695    ) -> RedisResult<crate::aio::ConnectionManager> {
696        crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
697    }
698
699    /// Returns an async [`ConnectionManager`][connection-manager] from the client.
700    ///
701    /// The connection manager wraps a
702    /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
703    /// connection fails with a connection error, then a new connection is
704    /// established in the background and the error is returned to the caller.
705    ///
706    /// This means that on connection loss at least one command will fail, but
707    /// the connection will be re-established automatically if possible. Please
708    /// refer to the [`ConnectionManager`][connection-manager] docs for
709    /// detailed reconnecting behavior.
710    ///
711    /// A connection manager can be cloned, allowing requests to be be sent concurrently
712    /// on the same underlying connection (tcp/unix socket).
713    ///
714    /// [connection-manager]: aio/struct.ConnectionManager.html
715    /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
716    #[cfg(feature = "connection-manager")]
717    #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
718    #[deprecated(note = "Use `get_connection_manager_with_config` instead")]
719    pub async fn get_connection_manager_with_backoff(
720        &self,
721        exponent_base: u64,
722        factor: u64,
723        number_of_retries: usize,
724    ) -> RedisResult<crate::aio::ConnectionManager> {
725        use crate::aio::ConnectionManagerConfig;
726
727        let config = ConnectionManagerConfig::new()
728            .set_exponent_base(exponent_base)
729            .set_factor(factor)
730            .set_number_of_retries(number_of_retries);
731        crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
732    }
733
734    async fn get_multiplexed_async_connection_inner_with_timeout<T>(
735        &self,
736        config: &AsyncConnectionConfig,
737        rt: Runtime,
738    ) -> RedisResult<crate::aio::MultiplexedConnection>
739    where
740        T: crate::aio::RedisRuntime,
741    {
742        let result = if let Some(connection_timeout) = config.connection_timeout {
743            rt.timeout(
744                connection_timeout,
745                self.get_multiplexed_async_connection_inner::<T>(config),
746            )
747            .await
748        } else {
749            Ok(self
750                .get_multiplexed_async_connection_inner::<T>(config)
751                .await)
752        };
753
754        match result {
755            Ok(Ok(connection)) => Ok(connection),
756            Ok(Err(e)) => Err(e),
757            Err(elapsed) => Err(elapsed.into()),
758        }
759    }
760
761    async fn get_multiplexed_async_connection_inner<T>(
762        &self,
763        config: &AsyncConnectionConfig,
764    ) -> RedisResult<crate::aio::MultiplexedConnection>
765    where
766        T: crate::aio::RedisRuntime,
767    {
768        let (mut connection, driver) = self
769            .create_multiplexed_async_connection_inner::<T>(config)
770            .await?;
771        let handle = T::spawn(driver);
772        connection.set_task_handle(handle);
773        Ok(connection)
774    }
775
776    async fn create_multiplexed_async_connection_inner<T>(
777        &self,
778        config: &AsyncConnectionConfig,
779    ) -> RedisResult<(
780        crate::aio::MultiplexedConnection,
781        impl std::future::Future<Output = ()>,
782    )>
783    where
784        T: crate::aio::RedisRuntime,
785    {
786        let resolver = config
787            .dns_resolver
788            .as_deref()
789            .unwrap_or(&DefaultAsyncDNSResolver);
790        let con = self
791            .get_simple_async_connection::<T>(resolver, &config.tcp_settings)
792            .await?;
793        crate::aio::MultiplexedConnection::new_with_config(
794            &self.connection_info.redis,
795            con,
796            config.clone(),
797        )
798        .await
799    }
800
801    async fn get_simple_async_connection_dynamically(
802        &self,
803        dns_resolver: &dyn AsyncDNSResolver,
804        tcp_settings: &TcpSettings,
805    ) -> RedisResult<Pin<Box<dyn crate::aio::AsyncStream + Send + Sync>>> {
806        match Runtime::locate() {
807            #[cfg(feature = "tokio-comp")]
808            Runtime::Tokio => {
809                self.get_simple_async_connection::<crate::aio::tokio::Tokio>(
810                    dns_resolver,
811                    tcp_settings,
812                )
813                .await
814            }
815
816            #[cfg(feature = "async-std-comp")]
817            Runtime::AsyncStd => {
818                self.get_simple_async_connection::<crate::aio::async_std::AsyncStd>(
819                    dns_resolver,
820                    tcp_settings,
821                )
822                .await
823            }
824
825            #[cfg(feature = "smol-comp")]
826            Runtime::Smol => {
827                self.get_simple_async_connection::<crate::aio::smol::Smol>(
828                    dns_resolver,
829                    tcp_settings,
830                )
831                .await
832            }
833        }
834    }
835
836    async fn get_simple_async_connection<T>(
837        &self,
838        dns_resolver: &dyn AsyncDNSResolver,
839        tcp_settings: &TcpSettings,
840    ) -> RedisResult<Pin<Box<dyn crate::aio::AsyncStream + Send + Sync>>>
841    where
842        T: crate::aio::RedisRuntime,
843    {
844        Ok(
845            crate::aio::connect_simple::<T>(&self.connection_info, dns_resolver, tcp_settings)
846                .await?
847                .boxed(),
848        )
849    }
850
851    #[cfg(feature = "connection-manager")]
852    pub(crate) fn connection_info(&self) -> &ConnectionInfo {
853        &self.connection_info
854    }
855
856    /// Returns an async receiver for pub-sub messages.
857    #[cfg(feature = "aio")]
858    // TODO - do we want to type-erase pubsub using a trait, to allow us to replace it with a different implementation later?
859    pub async fn get_async_pubsub(&self) -> RedisResult<crate::aio::PubSub> {
860        let connection = self
861            .get_simple_async_connection_dynamically(
862                &DefaultAsyncDNSResolver,
863                &TcpSettings::default(),
864            )
865            .await?;
866
867        crate::aio::PubSub::new(&self.connection_info.redis, connection).await
868    }
869
870    /// Returns an async receiver for monitor messages.
871    #[cfg(feature = "aio")]
872    pub async fn get_async_monitor(&self) -> RedisResult<crate::aio::Monitor> {
873        let connection = self
874            .get_simple_async_connection_dynamically(
875                &DefaultAsyncDNSResolver,
876                &TcpSettings::default(),
877            )
878            .await?;
879        crate::aio::Monitor::new(&self.connection_info.redis, connection).await
880    }
881}
882
883#[cfg(feature = "aio")]
884use crate::aio::Runtime;
885
886impl ConnectionLike for Client {
887    fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
888        self.get_connection()?.req_packed_command(cmd)
889    }
890
891    fn req_packed_commands(
892        &mut self,
893        cmd: &[u8],
894        offset: usize,
895        count: usize,
896    ) -> RedisResult<Vec<Value>> {
897        self.get_connection()?
898            .req_packed_commands(cmd, offset, count)
899    }
900
901    fn get_db(&self) -> i64 {
902        self.connection_info.redis.db
903    }
904
905    fn check_connection(&mut self) -> bool {
906        if let Ok(mut conn) = self.get_connection() {
907            conn.check_connection()
908        } else {
909            false
910        }
911    }
912
913    fn is_open(&self) -> bool {
914        if let Ok(conn) = self.get_connection() {
915            conn.is_open()
916        } else {
917            false
918        }
919    }
920}
921
922#[cfg(test)]
923mod test {
924    use super::*;
925
926    #[test]
927    fn regression_293_parse_ipv6_with_interface() {
928        assert!(Client::open(("fe80::cafe:beef%eno1", 6379)).is_ok());
929    }
930}