redis/client.rs
1use std::time::Duration;
2
3#[cfg(feature = "aio")]
4use crate::aio::{AsyncPushSender, DefaultAsyncDNSResolver};
5#[cfg(feature = "aio")]
6use crate::io::{tcp::TcpSettings, AsyncDNSResolver};
7use crate::{
8 connection::{connect, Connection, ConnectionInfo, ConnectionLike, IntoConnectionInfo},
9 types::{RedisResult, Value},
10};
11#[cfg(feature = "aio")]
12use std::pin::Pin;
13
14#[cfg(feature = "tls-rustls")]
15use crate::tls::{inner_build_with_tls, TlsCertificates};
16
17#[cfg(feature = "cache-aio")]
18use crate::caching::CacheConfig;
19#[cfg(all(
20 feature = "cache-aio",
21 any(feature = "connection-manager", feature = "cluster-async")
22))]
23use crate::caching::CacheManager;
24
25/// The client type.
26#[derive(Debug, Clone)]
27pub struct Client {
28 pub(crate) connection_info: ConnectionInfo,
29}
30
31/// The client acts as connector to the redis server. By itself it does not
32/// do much other than providing a convenient way to fetch a connection from
33/// it. In the future the plan is to provide a connection pool in the client.
34///
35/// When opening a client a URL in the following format should be used:
36///
37/// ```plain
38/// redis://host:port/db
39/// ```
40///
41/// Example usage::
42///
43/// ```rust,no_run
44/// let client = redis::Client::open("redis://127.0.0.1/").unwrap();
45/// let con = client.get_connection().unwrap();
46/// ```
47impl Client {
48 /// Connects to a redis server and returns a client. This does not
49 /// actually open a connection yet but it does perform some basic
50 /// checks on the URL that might make the operation fail.
51 pub fn open<T: IntoConnectionInfo>(params: T) -> RedisResult<Client> {
52 Ok(Client {
53 connection_info: params.into_connection_info()?,
54 })
55 }
56
57 /// Instructs the client to actually connect to redis and returns a
58 /// connection object. The connection object can be used to send
59 /// commands to the server. This can fail with a variety of errors
60 /// (like unreachable host) so it's important that you handle those
61 /// errors.
62 pub fn get_connection(&self) -> RedisResult<Connection> {
63 connect(&self.connection_info, None)
64 }
65
66 /// Instructs the client to actually connect to redis with specified
67 /// timeout and returns a connection object. The connection object
68 /// can be used to send commands to the server. This can fail with
69 /// a variety of errors (like unreachable host) so it's important
70 /// that you handle those errors.
71 pub fn get_connection_with_timeout(&self, timeout: Duration) -> RedisResult<Connection> {
72 connect(&self.connection_info, Some(timeout))
73 }
74
75 /// Returns a reference of client connection info object.
76 pub fn get_connection_info(&self) -> &ConnectionInfo {
77 &self.connection_info
78 }
79
80 /// Constructs a new `Client` with parameters necessary to create a TLS connection.
81 ///
82 /// - `conn_info` - URL using the `rediss://` scheme.
83 /// - `tls_certs` - `TlsCertificates` structure containing:
84 /// - `client_tls` - Optional `ClientTlsConfig` containing byte streams for
85 /// - `client_cert` - client's byte stream containing client certificate in PEM format
86 /// - `client_key` - client's byte stream containing private key in PEM format
87 /// - `root_cert` - Optional byte stream yielding PEM formatted file for root certificates.
88 ///
89 /// If `ClientTlsConfig` ( cert+key pair ) is not provided, then client-side authentication is not enabled.
90 /// If `root_cert` is not provided, then system root certificates are used instead.
91 ///
92 /// # Examples
93 ///
94 /// ```no_run
95 /// use std::{fs::File, io::{BufReader, Read}};
96 ///
97 /// use redis::{Client, AsyncCommands as _, TlsCertificates, ClientTlsConfig};
98 ///
99 /// async fn do_redis_code(
100 /// url: &str,
101 /// root_cert_file: &str,
102 /// cert_file: &str,
103 /// key_file: &str
104 /// ) -> redis::RedisResult<()> {
105 /// let root_cert_file = File::open(root_cert_file).expect("cannot open private cert file");
106 /// let mut root_cert_vec = Vec::new();
107 /// BufReader::new(root_cert_file)
108 /// .read_to_end(&mut root_cert_vec)
109 /// .expect("Unable to read ROOT cert file");
110 ///
111 /// let cert_file = File::open(cert_file).expect("cannot open private cert file");
112 /// let mut client_cert_vec = Vec::new();
113 /// BufReader::new(cert_file)
114 /// .read_to_end(&mut client_cert_vec)
115 /// .expect("Unable to read client cert file");
116 ///
117 /// let key_file = File::open(key_file).expect("cannot open private key file");
118 /// let mut client_key_vec = Vec::new();
119 /// BufReader::new(key_file)
120 /// .read_to_end(&mut client_key_vec)
121 /// .expect("Unable to read client key file");
122 ///
123 /// let client = Client::build_with_tls(
124 /// url,
125 /// TlsCertificates {
126 /// client_tls: Some(ClientTlsConfig{
127 /// client_cert: client_cert_vec,
128 /// client_key: client_key_vec,
129 /// }),
130 /// root_cert: Some(root_cert_vec),
131 /// }
132 /// )
133 /// .expect("Unable to build client");
134 ///
135 /// let connection_info = client.get_connection_info();
136 ///
137 /// println!(">>> connection info: {connection_info:?}");
138 ///
139 /// let mut con = client.get_multiplexed_async_connection().await?;
140 ///
141 /// con.set("key1", b"foo").await?;
142 ///
143 /// redis::cmd("SET")
144 /// .arg(&["key2", "bar"])
145 /// .exec_async(&mut con)
146 /// .await?;
147 ///
148 /// let result = redis::cmd("MGET")
149 /// .arg(&["key1", "key2"])
150 /// .query_async(&mut con)
151 /// .await;
152 /// assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec())));
153 /// println!("Result from MGET: {result:?}");
154 ///
155 /// Ok(())
156 /// }
157 /// ```
158 #[cfg(feature = "tls-rustls")]
159 pub fn build_with_tls<C: IntoConnectionInfo>(
160 conn_info: C,
161 tls_certs: TlsCertificates,
162 ) -> RedisResult<Client> {
163 let connection_info = conn_info.into_connection_info()?;
164
165 inner_build_with_tls(connection_info, &tls_certs)
166 }
167}
168
169#[cfg(feature = "cache-aio")]
170#[derive(Clone)]
171pub(crate) enum Cache {
172 Config(CacheConfig),
173 #[cfg(any(feature = "connection-manager", feature = "cluster-async"))]
174 Manager(CacheManager),
175}
176
177/// Options for creation of async connection
178#[cfg(feature = "aio")]
179#[derive(Clone, Default)]
180pub struct AsyncConnectionConfig {
181 /// Maximum time to wait for a response from the server
182 pub(crate) response_timeout: Option<std::time::Duration>,
183 /// Maximum time to wait for a connection to be established
184 pub(crate) connection_timeout: Option<std::time::Duration>,
185 pub(crate) push_sender: Option<std::sync::Arc<dyn AsyncPushSender>>,
186 #[cfg(feature = "cache-aio")]
187 pub(crate) cache: Option<Cache>,
188 pub(crate) tcp_settings: TcpSettings,
189 pub(crate) dns_resolver: Option<std::sync::Arc<dyn AsyncDNSResolver>>,
190}
191
192#[cfg(feature = "aio")]
193impl AsyncConnectionConfig {
194 /// Creates a new instance of the options with nothing set
195 pub fn new() -> Self {
196 Self::default()
197 }
198
199 /// Sets the connection timeout
200 pub fn set_connection_timeout(mut self, connection_timeout: std::time::Duration) -> Self {
201 self.connection_timeout = Some(connection_timeout);
202 self
203 }
204
205 /// Sets the response timeout
206 pub fn set_response_timeout(mut self, response_timeout: std::time::Duration) -> Self {
207 self.response_timeout = Some(response_timeout);
208 self
209 }
210
211 /// Sets sender sender for push values.
212 ///
213 /// The sender can be a channel, or an arbitrary function that handles [crate::PushInfo] values.
214 /// This will fail client creation if the connection isn't configured for RESP3 communications via the [crate::RedisConnectionInfo::protocol] field.
215 ///
216 /// # Examples
217 ///
218 /// ```rust
219 /// # use redis::AsyncConnectionConfig;
220 /// let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
221 /// let config = AsyncConnectionConfig::new().set_push_sender(tx);
222 /// ```
223 ///
224 /// ```rust
225 /// # use std::sync::{Mutex, Arc};
226 /// # use redis::AsyncConnectionConfig;
227 /// let messages = Arc::new(Mutex::new(Vec::new()));
228 /// let config = AsyncConnectionConfig::new().set_push_sender(move |msg|{
229 /// let Ok(mut messages) = messages.lock() else {
230 /// return Err(redis::aio::SendError);
231 /// };
232 /// messages.push(msg);
233 /// Ok(())
234 /// });
235 /// ```
236 pub fn set_push_sender(self, sender: impl AsyncPushSender) -> Self {
237 self.set_push_sender_internal(std::sync::Arc::new(sender))
238 }
239
240 pub(crate) fn set_push_sender_internal(
241 mut self,
242 sender: std::sync::Arc<dyn AsyncPushSender>,
243 ) -> Self {
244 self.push_sender = Some(sender);
245 self
246 }
247
248 /// Sets cache config for MultiplexedConnection, check CacheConfig for more details.
249 #[cfg(feature = "cache-aio")]
250 pub fn set_cache_config(mut self, cache_config: CacheConfig) -> Self {
251 self.cache = Some(Cache::Config(cache_config));
252 self
253 }
254
255 #[cfg(all(
256 feature = "cache-aio",
257 any(feature = "connection-manager", feature = "cluster-async")
258 ))]
259 pub(crate) fn set_cache_manager(mut self, cache_manager: CacheManager) -> Self {
260 self.cache = Some(Cache::Manager(cache_manager));
261 self
262 }
263
264 /// Set the behavior of the underlying TCP connection.
265 pub fn set_tcp_settings(self, tcp_settings: crate::io::tcp::TcpSettings) -> Self {
266 Self {
267 tcp_settings,
268 ..self
269 }
270 }
271
272 /// Set the DNS resolver for the underlying TCP connection.
273 ///
274 /// The parameter resolver must implement the [`crate::io::AsyncDNSResolver`] trait.
275 pub fn set_dns_resolver(self, dns_resolver: impl AsyncDNSResolver) -> Self {
276 self.set_dns_resolver_internal(std::sync::Arc::new(dns_resolver))
277 }
278
279 pub(super) fn set_dns_resolver_internal(
280 mut self,
281 dns_resolver: std::sync::Arc<dyn AsyncDNSResolver>,
282 ) -> Self {
283 self.dns_resolver = Some(dns_resolver);
284 self
285 }
286}
287
288/// To enable async support you need to chose one of the supported runtimes and active its
289/// corresponding feature: `tokio-comp` or `async-std-comp`
290#[cfg(feature = "aio")]
291#[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
292impl Client {
293 /// Returns an async connection from the client.
294 #[cfg(feature = "aio")]
295 #[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
296 pub async fn get_multiplexed_async_connection(
297 &self,
298 ) -> RedisResult<crate::aio::MultiplexedConnection> {
299 self.get_multiplexed_async_connection_with_config(&AsyncConnectionConfig::new())
300 .await
301 }
302
303 /// Returns an async connection from the client.
304 #[cfg(feature = "aio")]
305 #[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
306 #[deprecated(note = "Use `get_multiplexed_async_connection_with_config` instead")]
307 pub async fn get_multiplexed_async_connection_with_timeouts(
308 &self,
309 response_timeout: std::time::Duration,
310 connection_timeout: std::time::Duration,
311 ) -> RedisResult<crate::aio::MultiplexedConnection> {
312 self.get_multiplexed_async_connection_with_config(
313 &AsyncConnectionConfig::new()
314 .set_connection_timeout(connection_timeout)
315 .set_response_timeout(response_timeout),
316 )
317 .await
318 }
319
320 /// Returns an async connection from the client.
321 #[cfg(feature = "aio")]
322 #[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
323 pub async fn get_multiplexed_async_connection_with_config(
324 &self,
325 config: &AsyncConnectionConfig,
326 ) -> RedisResult<crate::aio::MultiplexedConnection> {
327 match Runtime::locate() {
328 #[cfg(feature = "tokio-comp")]
329 rt @ Runtime::Tokio => self
330 .get_multiplexed_async_connection_inner_with_timeout::<crate::aio::tokio::Tokio>(
331 config, rt,
332 )
333 .await,
334
335 #[cfg(feature = "async-std-comp")]
336 rt @ Runtime::AsyncStd => self.get_multiplexed_async_connection_inner_with_timeout::<
337 crate::aio::async_std::AsyncStd,
338 >(config, rt)
339 .await,
340
341 #[cfg(feature = "smol-comp")]
342 rt @ Runtime::Smol => self.get_multiplexed_async_connection_inner_with_timeout::<
343 crate::aio::smol::Smol,
344 >(config, rt)
345 .await,
346 }
347 }
348
349 /// Returns an async multiplexed connection from the client.
350 ///
351 /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
352 /// on the same underlying connection (tcp/unix socket).
353 #[cfg(feature = "tokio-comp")]
354 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
355 pub async fn get_multiplexed_tokio_connection_with_response_timeouts(
356 &self,
357 response_timeout: std::time::Duration,
358 connection_timeout: std::time::Duration,
359 ) -> RedisResult<crate::aio::MultiplexedConnection> {
360 let result = Runtime::locate()
361 .timeout(
362 connection_timeout,
363 self.get_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
364 &AsyncConnectionConfig::new().set_response_timeout(response_timeout),
365 ),
366 )
367 .await;
368
369 match result {
370 Ok(Ok(connection)) => Ok(connection),
371 Ok(Err(e)) => Err(e),
372 Err(elapsed) => Err(elapsed.into()),
373 }
374 }
375
376 /// Returns an async multiplexed connection from the client.
377 ///
378 /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
379 /// on the same underlying connection (tcp/unix socket).
380 #[cfg(feature = "tokio-comp")]
381 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
382 pub async fn get_multiplexed_tokio_connection(
383 &self,
384 ) -> RedisResult<crate::aio::MultiplexedConnection> {
385 self.get_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
386 &AsyncConnectionConfig::new(),
387 )
388 .await
389 }
390
391 /// Returns an async multiplexed connection from the client.
392 ///
393 /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
394 /// on the same underlying connection (tcp/unix socket).
395 #[cfg(feature = "async-std-comp")]
396 #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
397 pub async fn get_multiplexed_async_std_connection_with_timeouts(
398 &self,
399 response_timeout: std::time::Duration,
400 connection_timeout: std::time::Duration,
401 ) -> RedisResult<crate::aio::MultiplexedConnection> {
402 let result = Runtime::locate()
403 .timeout(
404 connection_timeout,
405 self.get_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
406 &AsyncConnectionConfig::new().set_response_timeout(response_timeout),
407 ),
408 )
409 .await;
410
411 match result {
412 Ok(Ok(connection)) => Ok(connection),
413 Ok(Err(e)) => Err(e),
414 Err(elapsed) => Err(elapsed.into()),
415 }
416 }
417
418 /// Returns an async multiplexed connection from the client.
419 ///
420 /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
421 /// on the same underlying connection (tcp/unix socket).
422 #[cfg(feature = "async-std-comp")]
423 #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
424 pub async fn get_multiplexed_async_std_connection(
425 &self,
426 ) -> RedisResult<crate::aio::MultiplexedConnection> {
427 self.get_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
428 &AsyncConnectionConfig::new(),
429 )
430 .await
431 }
432
433 /// Returns an async multiplexed connection from the client and a future which must be polled
434 /// to drive any requests submitted to it (see [Self::get_multiplexed_async_connection]).
435 ///
436 /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
437 /// on the same underlying connection (tcp/unix socket).
438 /// The multiplexer will return a timeout error on any request that takes longer then `response_timeout`.
439 #[cfg(feature = "tokio-comp")]
440 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
441 pub async fn create_multiplexed_tokio_connection_with_response_timeout(
442 &self,
443 response_timeout: std::time::Duration,
444 ) -> RedisResult<(
445 crate::aio::MultiplexedConnection,
446 impl std::future::Future<Output = ()>,
447 )> {
448 self.create_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
449 &AsyncConnectionConfig::new().set_response_timeout(response_timeout),
450 )
451 .await
452 }
453
454 /// Returns an async multiplexed connection from the client and a future which must be polled
455 /// to drive any requests submitted to it (see [Self::get_multiplexed_async_connection]).
456 ///
457 /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
458 /// on the same underlying connection (tcp/unix socket).
459 #[cfg(feature = "tokio-comp")]
460 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
461 pub async fn create_multiplexed_tokio_connection(
462 &self,
463 ) -> RedisResult<(
464 crate::aio::MultiplexedConnection,
465 impl std::future::Future<Output = ()>,
466 )> {
467 self.create_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
468 &AsyncConnectionConfig::new(),
469 )
470 .await
471 }
472
473 /// Returns an async multiplexed connection from the client and a future which must be polled
474 /// to drive any requests submitted to it (see [Self::get_multiplexed_async_connection]).
475 ///
476 /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
477 /// on the same underlying connection (tcp/unix socket).
478 /// The multiplexer will return a timeout error on any request that takes longer then `response_timeout`.
479 #[cfg(feature = "async-std-comp")]
480 #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
481 pub async fn create_multiplexed_async_std_connection_with_response_timeout(
482 &self,
483 response_timeout: std::time::Duration,
484 ) -> RedisResult<(
485 crate::aio::MultiplexedConnection,
486 impl std::future::Future<Output = ()>,
487 )> {
488 self.create_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
489 &AsyncConnectionConfig::new().set_response_timeout(response_timeout),
490 )
491 .await
492 }
493
494 /// Returns an async multiplexed connection from the client and a future which must be polled
495 /// to drive any requests submitted to it (see [Self::get_multiplexed_async_connection]).
496 ///
497 /// A multiplexed connection can be cloned, allowing requests to be sent concurrently
498 /// on the same underlying connection (tcp/unix socket).
499 #[cfg(feature = "async-std-comp")]
500 #[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
501 pub async fn create_multiplexed_async_std_connection(
502 &self,
503 ) -> RedisResult<(
504 crate::aio::MultiplexedConnection,
505 impl std::future::Future<Output = ()>,
506 )> {
507 self.create_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
508 &AsyncConnectionConfig::new(),
509 )
510 .await
511 }
512
513 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
514 ///
515 /// The connection manager wraps a
516 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
517 /// connection fails with a connection error, then a new connection is
518 /// established in the background and the error is returned to the caller.
519 ///
520 /// This means that on connection loss at least one command will fail, but
521 /// the connection will be re-established automatically if possible. Please
522 /// refer to the [`ConnectionManager`][connection-manager] docs for
523 /// detailed reconnecting behavior.
524 ///
525 /// A connection manager can be cloned, allowing requests to be sent concurrently
526 /// on the same underlying connection (tcp/unix socket).
527 ///
528 /// [connection-manager]: aio/struct.ConnectionManager.html
529 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
530 #[cfg(feature = "connection-manager")]
531 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
532 #[deprecated(note = "use get_connection_manager instead")]
533 pub async fn get_tokio_connection_manager(&self) -> RedisResult<crate::aio::ConnectionManager> {
534 crate::aio::ConnectionManager::new(self.clone()).await
535 }
536
537 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
538 ///
539 /// The connection manager wraps a
540 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
541 /// connection fails with a connection error, then a new connection is
542 /// established in the background and the error is returned to the caller.
543 ///
544 /// This means that on connection loss at least one command will fail, but
545 /// the connection will be re-established automatically if possible. Please
546 /// refer to the [`ConnectionManager`][connection-manager] docs for
547 /// detailed reconnecting behavior.
548 ///
549 /// A connection manager can be cloned, allowing requests to be sent concurrently
550 /// on the same underlying connection (tcp/unix socket).
551 ///
552 /// [connection-manager]: aio/struct.ConnectionManager.html
553 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
554 #[cfg(feature = "connection-manager")]
555 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
556 pub async fn get_connection_manager(&self) -> RedisResult<crate::aio::ConnectionManager> {
557 crate::aio::ConnectionManager::new(self.clone()).await
558 }
559
560 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
561 ///
562 /// The connection manager wraps a
563 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
564 /// connection fails with a connection error, then a new connection is
565 /// established in the background and the error is returned to the caller.
566 ///
567 /// This means that on connection loss at least one command will fail, but
568 /// the connection will be re-established automatically if possible. Please
569 /// refer to the [`ConnectionManager`][connection-manager] docs for
570 /// detailed reconnecting behavior.
571 ///
572 /// A connection manager can be cloned, allowing requests to be sent concurrently
573 /// on the same underlying connection (tcp/unix socket).
574 ///
575 /// [connection-manager]: aio/struct.ConnectionManager.html
576 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
577 #[cfg(feature = "connection-manager")]
578 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
579 #[deprecated(note = "Use `get_connection_manager_with_config` instead")]
580 pub async fn get_tokio_connection_manager_with_backoff(
581 &self,
582 exponent_base: u64,
583 factor: u64,
584 number_of_retries: usize,
585 ) -> RedisResult<crate::aio::ConnectionManager> {
586 use crate::aio::ConnectionManagerConfig;
587
588 let config = ConnectionManagerConfig::new()
589 .set_exponent_base(exponent_base)
590 .set_factor(factor)
591 .set_number_of_retries(number_of_retries);
592 crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
593 }
594
595 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
596 ///
597 /// The connection manager wraps a
598 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
599 /// connection fails with a connection error, then a new connection is
600 /// established in the background and the error is returned to the caller.
601 ///
602 /// This means that on connection loss at least one command will fail, but
603 /// the connection will be re-established automatically if possible. Please
604 /// refer to the [`ConnectionManager`][connection-manager] docs for
605 /// detailed reconnecting behavior.
606 ///
607 /// A connection manager can be cloned, allowing requests to be sent concurrently
608 /// on the same underlying connection (tcp/unix socket).
609 ///
610 /// [connection-manager]: aio/struct.ConnectionManager.html
611 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
612 #[cfg(feature = "connection-manager")]
613 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
614 #[deprecated(note = "Use `get_connection_manager_with_config` instead")]
615 pub async fn get_tokio_connection_manager_with_backoff_and_timeouts(
616 &self,
617 exponent_base: u64,
618 factor: u64,
619 number_of_retries: usize,
620 response_timeout: std::time::Duration,
621 connection_timeout: std::time::Duration,
622 ) -> RedisResult<crate::aio::ConnectionManager> {
623 use crate::aio::ConnectionManagerConfig;
624
625 let config = ConnectionManagerConfig::new()
626 .set_exponent_base(exponent_base)
627 .set_factor(factor)
628 .set_response_timeout(response_timeout)
629 .set_connection_timeout(connection_timeout)
630 .set_number_of_retries(number_of_retries);
631 crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
632 }
633
634 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
635 ///
636 /// The connection manager wraps a
637 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
638 /// connection fails with a connection error, then a new connection is
639 /// established in the background and the error is returned to the caller.
640 ///
641 /// This means that on connection loss at least one command will fail, but
642 /// the connection will be re-established automatically if possible. Please
643 /// refer to the [`ConnectionManager`][connection-manager] docs for
644 /// detailed reconnecting behavior.
645 ///
646 /// A connection manager can be cloned, allowing requests to be sent concurrently
647 /// on the same underlying connection (tcp/unix socket).
648 ///
649 /// [connection-manager]: aio/struct.ConnectionManager.html
650 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
651 #[cfg(feature = "connection-manager")]
652 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
653 #[deprecated(note = "Use `get_connection_manager_with_config` instead")]
654 pub async fn get_connection_manager_with_backoff_and_timeouts(
655 &self,
656 exponent_base: u64,
657 factor: u64,
658 number_of_retries: usize,
659 response_timeout: std::time::Duration,
660 connection_timeout: std::time::Duration,
661 ) -> RedisResult<crate::aio::ConnectionManager> {
662 use crate::aio::ConnectionManagerConfig;
663
664 let config = ConnectionManagerConfig::new()
665 .set_exponent_base(exponent_base)
666 .set_factor(factor)
667 .set_response_timeout(response_timeout)
668 .set_connection_timeout(connection_timeout)
669 .set_number_of_retries(number_of_retries);
670 crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
671 }
672
673 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
674 ///
675 /// The connection manager wraps a
676 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
677 /// connection fails with a connection error, then a new connection is
678 /// established in the background and the error is returned to the caller.
679 ///
680 /// This means that on connection loss at least one command will fail, but
681 /// the connection will be re-established automatically if possible. Please
682 /// refer to the [`ConnectionManager`][connection-manager] docs for
683 /// detailed reconnecting behavior.
684 ///
685 /// A connection manager can be cloned, allowing requests to be sent concurrently
686 /// on the same underlying connection (tcp/unix socket).
687 ///
688 /// [connection-manager]: aio/struct.ConnectionManager.html
689 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
690 #[cfg(feature = "connection-manager")]
691 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
692 pub async fn get_connection_manager_with_config(
693 &self,
694 config: crate::aio::ConnectionManagerConfig,
695 ) -> RedisResult<crate::aio::ConnectionManager> {
696 crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
697 }
698
699 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
700 ///
701 /// The connection manager wraps a
702 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
703 /// connection fails with a connection error, then a new connection is
704 /// established in the background and the error is returned to the caller.
705 ///
706 /// This means that on connection loss at least one command will fail, but
707 /// the connection will be re-established automatically if possible. Please
708 /// refer to the [`ConnectionManager`][connection-manager] docs for
709 /// detailed reconnecting behavior.
710 ///
711 /// A connection manager can be cloned, allowing requests to be be sent concurrently
712 /// on the same underlying connection (tcp/unix socket).
713 ///
714 /// [connection-manager]: aio/struct.ConnectionManager.html
715 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
716 #[cfg(feature = "connection-manager")]
717 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
718 #[deprecated(note = "Use `get_connection_manager_with_config` instead")]
719 pub async fn get_connection_manager_with_backoff(
720 &self,
721 exponent_base: u64,
722 factor: u64,
723 number_of_retries: usize,
724 ) -> RedisResult<crate::aio::ConnectionManager> {
725 use crate::aio::ConnectionManagerConfig;
726
727 let config = ConnectionManagerConfig::new()
728 .set_exponent_base(exponent_base)
729 .set_factor(factor)
730 .set_number_of_retries(number_of_retries);
731 crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
732 }
733
734 async fn get_multiplexed_async_connection_inner_with_timeout<T>(
735 &self,
736 config: &AsyncConnectionConfig,
737 rt: Runtime,
738 ) -> RedisResult<crate::aio::MultiplexedConnection>
739 where
740 T: crate::aio::RedisRuntime,
741 {
742 let result = if let Some(connection_timeout) = config.connection_timeout {
743 rt.timeout(
744 connection_timeout,
745 self.get_multiplexed_async_connection_inner::<T>(config),
746 )
747 .await
748 } else {
749 Ok(self
750 .get_multiplexed_async_connection_inner::<T>(config)
751 .await)
752 };
753
754 match result {
755 Ok(Ok(connection)) => Ok(connection),
756 Ok(Err(e)) => Err(e),
757 Err(elapsed) => Err(elapsed.into()),
758 }
759 }
760
761 async fn get_multiplexed_async_connection_inner<T>(
762 &self,
763 config: &AsyncConnectionConfig,
764 ) -> RedisResult<crate::aio::MultiplexedConnection>
765 where
766 T: crate::aio::RedisRuntime,
767 {
768 let (mut connection, driver) = self
769 .create_multiplexed_async_connection_inner::<T>(config)
770 .await?;
771 let handle = T::spawn(driver);
772 connection.set_task_handle(handle);
773 Ok(connection)
774 }
775
776 async fn create_multiplexed_async_connection_inner<T>(
777 &self,
778 config: &AsyncConnectionConfig,
779 ) -> RedisResult<(
780 crate::aio::MultiplexedConnection,
781 impl std::future::Future<Output = ()>,
782 )>
783 where
784 T: crate::aio::RedisRuntime,
785 {
786 let resolver = config
787 .dns_resolver
788 .as_deref()
789 .unwrap_or(&DefaultAsyncDNSResolver);
790 let con = self
791 .get_simple_async_connection::<T>(resolver, &config.tcp_settings)
792 .await?;
793 crate::aio::MultiplexedConnection::new_with_config(
794 &self.connection_info.redis,
795 con,
796 config.clone(),
797 )
798 .await
799 }
800
801 async fn get_simple_async_connection_dynamically(
802 &self,
803 dns_resolver: &dyn AsyncDNSResolver,
804 tcp_settings: &TcpSettings,
805 ) -> RedisResult<Pin<Box<dyn crate::aio::AsyncStream + Send + Sync>>> {
806 match Runtime::locate() {
807 #[cfg(feature = "tokio-comp")]
808 Runtime::Tokio => {
809 self.get_simple_async_connection::<crate::aio::tokio::Tokio>(
810 dns_resolver,
811 tcp_settings,
812 )
813 .await
814 }
815
816 #[cfg(feature = "async-std-comp")]
817 Runtime::AsyncStd => {
818 self.get_simple_async_connection::<crate::aio::async_std::AsyncStd>(
819 dns_resolver,
820 tcp_settings,
821 )
822 .await
823 }
824
825 #[cfg(feature = "smol-comp")]
826 Runtime::Smol => {
827 self.get_simple_async_connection::<crate::aio::smol::Smol>(
828 dns_resolver,
829 tcp_settings,
830 )
831 .await
832 }
833 }
834 }
835
836 async fn get_simple_async_connection<T>(
837 &self,
838 dns_resolver: &dyn AsyncDNSResolver,
839 tcp_settings: &TcpSettings,
840 ) -> RedisResult<Pin<Box<dyn crate::aio::AsyncStream + Send + Sync>>>
841 where
842 T: crate::aio::RedisRuntime,
843 {
844 Ok(
845 crate::aio::connect_simple::<T>(&self.connection_info, dns_resolver, tcp_settings)
846 .await?
847 .boxed(),
848 )
849 }
850
851 #[cfg(feature = "connection-manager")]
852 pub(crate) fn connection_info(&self) -> &ConnectionInfo {
853 &self.connection_info
854 }
855
856 /// Returns an async receiver for pub-sub messages.
857 #[cfg(feature = "aio")]
858 // TODO - do we want to type-erase pubsub using a trait, to allow us to replace it with a different implementation later?
859 pub async fn get_async_pubsub(&self) -> RedisResult<crate::aio::PubSub> {
860 let connection = self
861 .get_simple_async_connection_dynamically(
862 &DefaultAsyncDNSResolver,
863 &TcpSettings::default(),
864 )
865 .await?;
866
867 crate::aio::PubSub::new(&self.connection_info.redis, connection).await
868 }
869
870 /// Returns an async receiver for monitor messages.
871 #[cfg(feature = "aio")]
872 pub async fn get_async_monitor(&self) -> RedisResult<crate::aio::Monitor> {
873 let connection = self
874 .get_simple_async_connection_dynamically(
875 &DefaultAsyncDNSResolver,
876 &TcpSettings::default(),
877 )
878 .await?;
879 crate::aio::Monitor::new(&self.connection_info.redis, connection).await
880 }
881}
882
883#[cfg(feature = "aio")]
884use crate::aio::Runtime;
885
886impl ConnectionLike for Client {
887 fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
888 self.get_connection()?.req_packed_command(cmd)
889 }
890
891 fn req_packed_commands(
892 &mut self,
893 cmd: &[u8],
894 offset: usize,
895 count: usize,
896 ) -> RedisResult<Vec<Value>> {
897 self.get_connection()?
898 .req_packed_commands(cmd, offset, count)
899 }
900
901 fn get_db(&self) -> i64 {
902 self.connection_info.redis.db
903 }
904
905 fn check_connection(&mut self) -> bool {
906 if let Ok(mut conn) = self.get_connection() {
907 conn.check_connection()
908 } else {
909 false
910 }
911 }
912
913 fn is_open(&self) -> bool {
914 if let Ok(conn) = self.get_connection() {
915 conn.is_open()
916 } else {
917 false
918 }
919 }
920}
921
922#[cfg(test)]
923mod test {
924 use super::*;
925
926 #[test]
927 fn regression_293_parse_ipv6_with_interface() {
928 assert!(Client::open(("fe80::cafe:beef%eno1", 6379)).is_ok());
929 }
930}