Skip to main content

redis/
connection.rs

1use std::borrow::Cow;
2use std::collections::VecDeque;
3use std::fmt;
4use std::io::{self, Write};
5use std::net::{self, SocketAddr, TcpStream, ToSocketAddrs};
6use std::ops::DerefMut;
7use std::path::PathBuf;
8use std::str::{FromStr, from_utf8};
9use std::time::{Duration, Instant};
10
11use crate::cmd::{Cmd, cmd, pipe};
12use crate::errors::{ErrorKind, RedisError, ServerError, ServerErrorKind};
13use crate::io::tcp::{TcpSettings, stream_with_settings};
14use crate::parser::Parser;
15use crate::pipeline::Pipeline;
16use crate::types::{
17    FromRedisValue, HashMap, PushKind, RedisResult, SyncPushSender, ToRedisArgs, Value,
18    from_redis_value_ref,
19};
20use crate::{ProtocolVersion, check_resp3, from_redis_value};
21
22#[cfg(unix)]
23use std::os::unix::net::UnixStream;
24
25use crate::commands::resp3_hello;
26use arcstr::ArcStr;
27#[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
28use native_tls::{TlsConnector, TlsStream};
29
30#[cfg(feature = "tls-rustls")]
31use rustls::{RootCertStore, StreamOwned};
32#[cfg(feature = "tls-rustls")]
33use std::sync::Arc;
34
35use crate::PushInfo;
36
37#[cfg(all(
38    feature = "tls-rustls",
39    not(feature = "tls-native-tls"),
40    not(feature = "tls-rustls-webpki-roots")
41))]
42use rustls_native_certs::load_native_certs;
43
44#[cfg(feature = "tls-rustls")]
45use crate::tls::ClientTlsParams;
46
47// Non-exhaustive to prevent construction outside this crate
48#[derive(Clone, Debug)]
49pub struct TlsConnParams {
50    #[cfg(feature = "tls-rustls")]
51    pub(crate) client_tls_params: Option<ClientTlsParams>,
52    #[cfg(feature = "tls-rustls")]
53    pub(crate) root_cert_store: Option<RootCertStore>,
54    #[cfg(any(feature = "tls-rustls-insecure", feature = "tls-native-tls"))]
55    pub(crate) danger_accept_invalid_hostnames: bool,
56}
57
58static DEFAULT_PORT: u16 = 6379;
59
60/// Default library name to connect with
61const DEFAULT_CLIENT_SETINFO_LIB_NAME: &str = "redis-rs";
62/// Default library version to connect with
63const DEFAULT_CLIENT_SETINFO_LIB_VER: &str = env!("CARGO_PKG_VERSION");
64
65#[inline(always)]
66fn connect_tcp(addr: (&str, u16), tcp_settings: &TcpSettings) -> io::Result<TcpStream> {
67    let socket = TcpStream::connect(addr)?;
68    stream_with_settings(socket, tcp_settings)
69}
70
71#[inline(always)]
72fn connect_tcp_timeout(
73    addr: &SocketAddr,
74    timeout: Duration,
75    tcp_settings: &TcpSettings,
76) -> io::Result<TcpStream> {
77    let socket = TcpStream::connect_timeout(addr, timeout)?;
78    stream_with_settings(socket, tcp_settings)
79}
80
81/// This function takes a redis URL string and parses it into a URL
82/// as used by rust-url.
83///
84/// This is necessary as the default parser does not understand how redis URLs function.
85pub fn parse_redis_url(input: &str) -> Option<url::Url> {
86    match url::Url::parse(input) {
87        Ok(result) => match result.scheme() {
88            "redis" | "rediss" | "valkey" | "valkeys" | "redis+unix" | "valkey+unix" | "unix" => {
89                Some(result)
90            }
91            _ => None,
92        },
93        Err(_) => None,
94    }
95}
96
97/// TlsMode indicates use or do not use verification of certification.
98///
99/// Check [ConnectionAddr](ConnectionAddr::TcpTls::insecure) for more.
100#[derive(Clone, Copy, PartialEq)]
101#[non_exhaustive]
102pub enum TlsMode {
103    /// Secure verify certification.
104    Secure,
105    /// Insecure do not verify certification.
106    Insecure,
107}
108
109/// Defines the connection address.
110///
111/// Not all connection addresses are supported on all platforms.  For instance
112/// to connect to a unix socket you need to run this on an operating system
113/// that supports them.
114#[derive(Clone, Debug)]
115#[non_exhaustive]
116pub enum ConnectionAddr {
117    /// Format for this is `(host, port)`.
118    Tcp(String, u16),
119    /// Format for this is `(host, port)`.
120    TcpTls {
121        /// Hostname
122        host: String,
123        /// Port
124        port: u16,
125        /// Disable hostname verification when connecting.
126        ///
127        /// # Warning
128        ///
129        /// You should think very carefully before you use this method. If hostname
130        /// verification is not used, any valid certificate for any site will be
131        /// trusted for use from any other. This introduces a significant
132        /// vulnerability to man-in-the-middle attacks.
133        insecure: bool,
134
135        /// TLS certificates and client key.
136        tls_params: Option<TlsConnParams>,
137    },
138    /// Format for this is the path to the unix socket.
139    Unix(PathBuf),
140}
141
142impl PartialEq for ConnectionAddr {
143    fn eq(&self, other: &Self) -> bool {
144        match (self, other) {
145            (ConnectionAddr::Tcp(host1, port1), ConnectionAddr::Tcp(host2, port2)) => {
146                host1 == host2 && port1 == port2
147            }
148            (
149                ConnectionAddr::TcpTls {
150                    host: host1,
151                    port: port1,
152                    insecure: insecure1,
153                    tls_params: _,
154                },
155                ConnectionAddr::TcpTls {
156                    host: host2,
157                    port: port2,
158                    insecure: insecure2,
159                    tls_params: _,
160                },
161            ) => port1 == port2 && host1 == host2 && insecure1 == insecure2,
162            (ConnectionAddr::Unix(path1), ConnectionAddr::Unix(path2)) => path1 == path2,
163            _ => false,
164        }
165    }
166}
167
168impl Eq for ConnectionAddr {}
169
170impl ConnectionAddr {
171    /// Checks if this address is supported.
172    ///
173    /// Because not all platforms support all connection addresses this is a
174    /// quick way to figure out if a connection method is supported. Currently
175    /// this affects:
176    ///
177    /// - Unix socket addresses, which are supported only on Unix
178    ///
179    /// - TLS addresses, which are supported only if a TLS feature is enabled
180    ///   (either `tls-native-tls` or `tls-rustls`).
181    pub fn is_supported(&self) -> bool {
182        match *self {
183            ConnectionAddr::Tcp(_, _) => true,
184            ConnectionAddr::TcpTls { .. } => {
185                cfg!(any(feature = "tls-native-tls", feature = "tls-rustls"))
186            }
187            ConnectionAddr::Unix(_) => cfg!(unix),
188        }
189    }
190
191    /// Configure this address to connect without checking certificate hostnames.
192    ///
193    /// # Warning
194    ///
195    /// You should think very carefully before you use this method. If hostname
196    /// verification is not used, any valid certificate for any site will be
197    /// trusted for use from any other. This introduces a significant
198    /// vulnerability to man-in-the-middle attacks.
199    #[cfg(any(feature = "tls-rustls-insecure", feature = "tls-native-tls"))]
200    pub fn set_danger_accept_invalid_hostnames(&mut self, insecure: bool) {
201        if let ConnectionAddr::TcpTls { tls_params, .. } = self {
202            if let Some(params) = tls_params {
203                params.danger_accept_invalid_hostnames = insecure;
204            } else if insecure {
205                *tls_params = Some(TlsConnParams {
206                    #[cfg(feature = "tls-rustls")]
207                    client_tls_params: None,
208                    #[cfg(feature = "tls-rustls")]
209                    root_cert_store: None,
210                    danger_accept_invalid_hostnames: insecure,
211                });
212            }
213        }
214    }
215
216    #[cfg(feature = "cluster")]
217    pub(crate) fn tls_mode(&self) -> Option<TlsMode> {
218        match self {
219            ConnectionAddr::TcpTls { insecure, .. } => {
220                if *insecure {
221                    Some(TlsMode::Insecure)
222                } else {
223                    Some(TlsMode::Secure)
224                }
225            }
226            _ => None,
227        }
228    }
229}
230
231impl fmt::Display for ConnectionAddr {
232    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
233        // Cluster::get_connection_info depends on the return value from this function
234        match *self {
235            ConnectionAddr::Tcp(ref host, port) => write!(f, "{host}:{port}"),
236            ConnectionAddr::TcpTls { ref host, port, .. } => write!(f, "{host}:{port}"),
237            ConnectionAddr::Unix(ref path) => write!(f, "{}", path.display()),
238        }
239    }
240}
241
242/// Holds the connection information that redis should use for connecting.
243#[derive(Clone, Debug)]
244pub struct ConnectionInfo {
245    /// A connection address for where to connect to.
246    pub(crate) addr: ConnectionAddr,
247
248    /// The settings for the TCP connection
249    pub(crate) tcp_settings: TcpSettings,
250    /// A redis connection info for how to handshake with redis.
251    pub(crate) redis: RedisConnectionInfo,
252}
253
254impl ConnectionInfo {
255    /// Returns the connection address.
256    pub fn addr(&self) -> &ConnectionAddr {
257        &self.addr
258    }
259
260    /// Returns the settings for the TCP connection.
261    pub fn tcp_settings(&self) -> &TcpSettings {
262        &self.tcp_settings
263    }
264
265    /// Returns the redis connection info for how to handshake with redis.
266    pub fn redis_settings(&self) -> &RedisConnectionInfo {
267        &self.redis
268    }
269
270    /// Sets the connection address for where to connect to.
271    pub fn set_addr(mut self, addr: ConnectionAddr) -> Self {
272        self.addr = addr;
273        self
274    }
275
276    /// Sets the TCP settings for the connection.
277    pub fn set_tcp_settings(mut self, tcp_settings: TcpSettings) -> Self {
278        self.tcp_settings = tcp_settings;
279        self
280    }
281
282    /// Set all redis connection info fields at once.
283    pub fn set_redis_settings(mut self, redis: RedisConnectionInfo) -> Self {
284        self.redis = redis;
285        self
286    }
287}
288
289/// Redis specific/connection independent information used to establish a connection to redis.
290#[derive(Clone, Default)]
291pub struct RedisConnectionInfo {
292    /// The database number to use.  This is usually `0`.
293    pub(crate) db: i64,
294    /// Optionally a username that should be used for connection.
295    pub(crate) username: Option<ArcStr>,
296    /// Optionally a password that should be used for connection.
297    pub(crate) password: Option<ArcStr>,
298    /// Version of the protocol to use.
299    pub(crate) protocol: ProtocolVersion,
300    /// If set, the connection shouldn't send the library name and version to the server.
301    pub(crate) skip_set_lib_name: bool,
302    /// Library name to send to the server after connecting (if [`Self::skip_set_lib_name`] is `false`)
303    pub(crate) lib_name: Option<ArcStr>,
304    /// Library name to send to the server after connecting (if [`Self::skip_set_lib_name`] is `false`)
305    pub(crate) lib_ver: Option<ArcStr>,
306}
307
308impl RedisConnectionInfo {
309    /// Returns the username that should be used for connection.
310    pub fn username(&self) -> Option<&str> {
311        self.username.as_deref()
312    }
313
314    /// Returns the password that should be used for connection.
315    pub fn password(&self) -> Option<&str> {
316        self.password.as_deref()
317    }
318
319    /// Returns version of the protocol to use.
320    pub fn protocol(&self) -> ProtocolVersion {
321        self.protocol
322    }
323
324    /// Returns `true` if the `CLIENT SETINFO` command should be skipped.
325    pub fn skip_set_lib_name(&self) -> bool {
326        self.skip_set_lib_name
327    }
328
329    /// Returns the set library name
330    pub fn lib_name(&self) -> Option<&str> {
331        self.lib_name.as_deref()
332    }
333
334    /// Returns the set library version
335    pub fn lib_ver(&self) -> Option<&str> {
336        self.lib_ver.as_deref()
337    }
338
339    /// Returns the database number to use.
340    pub fn db(&self) -> i64 {
341        self.db
342    }
343
344    /// Sets the username for the connection's ACL.
345    pub fn set_username(mut self, username: impl AsRef<str>) -> Self {
346        self.username = Some(username.as_ref().into());
347        self
348    }
349
350    /// Sets the password for the connection's ACL.
351    pub fn set_password(mut self, password: impl AsRef<str>) -> Self {
352        self.password = Some(password.as_ref().into());
353        self
354    }
355
356    /// Sets the version of the RESP to use.
357    pub fn set_protocol(mut self, protocol: ProtocolVersion) -> Self {
358        self.protocol = protocol;
359        self
360    }
361
362    /// Removes the pipelined `CLIENT SETINFO` call from the connection creation.
363    ///
364    /// This function makes previously set [`lib_name`](Self::lib_name) and
365    /// [`lib_ver`](Self::lib_ver) ineffective.
366    pub fn set_skip_set_lib_name(mut self) -> Self {
367        self.skip_set_lib_name = true;
368        self
369    }
370
371    /// Sets the library information and enables sending it when connecting.
372    ///
373    /// This function clears [`skip_set_lib_name`](Self::skip_set_lib_name).
374    pub fn set_lib_name(mut self, lib_name: impl AsRef<str>, lib_ver: impl AsRef<str>) -> Self {
375        self.lib_name = Some(lib_name.as_ref().into());
376        self.lib_ver = Some(lib_ver.as_ref().into());
377        self.skip_set_lib_name = false;
378        self
379    }
380
381    /// Sets the database number to use.
382    pub fn set_db(mut self, db: i64) -> Self {
383        self.db = db;
384        self
385    }
386}
387
388impl std::fmt::Debug for RedisConnectionInfo {
389    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
390        let RedisConnectionInfo {
391            db,
392            username,
393            password,
394            protocol,
395            skip_set_lib_name,
396            lib_name,
397            lib_ver,
398        } = self;
399        let mut debug_info = f.debug_struct("RedisConnectionInfo");
400
401        debug_info.field("db", &db);
402        debug_info.field("username", &username);
403        debug_info.field("password", &password.as_ref().map(|_| "<redacted>"));
404        debug_info.field("protocol", &protocol);
405        debug_info.field("skip_set_lib_name", &skip_set_lib_name);
406        debug_info.field("lib_name", &lib_name);
407        debug_info.field("lib_ver", &lib_ver);
408
409        debug_info.finish()
410    }
411}
412
413impl FromStr for ConnectionInfo {
414    type Err = RedisError;
415
416    fn from_str(s: &str) -> Result<Self, Self::Err> {
417        s.into_connection_info()
418    }
419}
420
421/// Converts an object into a connection info struct.  This allows the
422/// constructor of the client to accept connection information in a
423/// range of different formats.
424pub trait IntoConnectionInfo {
425    /// Converts the object into a connection info object.
426    fn into_connection_info(self) -> RedisResult<ConnectionInfo>;
427}
428
429impl IntoConnectionInfo for ConnectionInfo {
430    fn into_connection_info(self) -> RedisResult<ConnectionInfo> {
431        Ok(self)
432    }
433}
434
435impl IntoConnectionInfo for ConnectionAddr {
436    fn into_connection_info(self) -> RedisResult<ConnectionInfo> {
437        Ok(ConnectionInfo {
438            addr: self,
439            redis: Default::default(),
440            tcp_settings: Default::default(),
441        })
442    }
443}
444
445/// URL format: `{redis|rediss|valkey|valkeys}://[<username>][:<password>@]<hostname>[:port][/<db>]`
446///
447/// - Basic: `redis://127.0.0.1:6379`
448/// - Username & Password: `redis://user:password@127.0.0.1:6379`
449/// - Password only: `redis://:password@127.0.0.1:6379`
450/// - Specifying DB: `redis://127.0.0.1:6379/0`
451/// - Enabling TLS: `rediss://127.0.0.1:6379`
452/// - Enabling Insecure TLS: `rediss://127.0.0.1:6379/#insecure`
453/// - Enabling RESP3: `redis://127.0.0.1:6379/?protocol=resp3`
454impl IntoConnectionInfo for &str {
455    fn into_connection_info(self) -> RedisResult<ConnectionInfo> {
456        match parse_redis_url(self) {
457            Some(u) => u.into_connection_info(),
458            None => fail!((ErrorKind::InvalidClientConfig, "Redis URL did not parse")),
459        }
460    }
461}
462
463impl<T> IntoConnectionInfo for (T, u16)
464where
465    T: Into<String>,
466{
467    fn into_connection_info(self) -> RedisResult<ConnectionInfo> {
468        Ok(ConnectionInfo {
469            addr: ConnectionAddr::Tcp(self.0.into(), self.1),
470            redis: RedisConnectionInfo::default(),
471            tcp_settings: TcpSettings::default(),
472        })
473    }
474}
475
476/// URL format: `{redis|rediss|valkey|valkeys}://[<username>][:<password>@]<hostname>[:port][/<db>]`
477///
478/// - Basic: `redis://127.0.0.1:6379`
479/// - Username & Password: `redis://user:password@127.0.0.1:6379`
480/// - Password only: `redis://:password@127.0.0.1:6379`
481/// - Specifying DB: `redis://127.0.0.1:6379/0`
482/// - Enabling TLS: `rediss://127.0.0.1:6379`
483/// - Enabling Insecure TLS: `rediss://127.0.0.1:6379/#insecure`
484/// - Enabling RESP3: `redis://127.0.0.1:6379/?protocol=resp3`
485impl IntoConnectionInfo for String {
486    fn into_connection_info(self) -> RedisResult<ConnectionInfo> {
487        match parse_redis_url(&self) {
488            Some(u) => u.into_connection_info(),
489            None => fail!((ErrorKind::InvalidClientConfig, "Redis URL did not parse")),
490        }
491    }
492}
493
494fn parse_protocol(query: &HashMap<Cow<str>, Cow<str>>) -> RedisResult<ProtocolVersion> {
495    Ok(match query.get("protocol") {
496        Some(protocol) => {
497            if protocol == "2" || protocol == "resp2" {
498                ProtocolVersion::RESP2
499            } else if protocol == "3" || protocol == "resp3" {
500                ProtocolVersion::RESP3
501            } else {
502                fail!((
503                    ErrorKind::InvalidClientConfig,
504                    "Invalid protocol version",
505                    protocol.to_string()
506                ))
507            }
508        }
509        None => ProtocolVersion::RESP2,
510    })
511}
512
513#[inline]
514pub(crate) fn is_wildcard_address(address: &str) -> bool {
515    address == "0.0.0.0" || address == "::"
516}
517
518fn url_to_tcp_connection_info(url: url::Url) -> RedisResult<ConnectionInfo> {
519    let host = match url.host() {
520        Some(host) => {
521            // Here we manually match host's enum arms and call their to_string().
522            // Because url.host().to_string() will add `[` and `]` for ipv6:
523            // https://docs.rs/url/latest/src/url/host.rs.html#170
524            // And these brackets will break host.parse::<Ipv6Addr>() when
525            // `client.open()` - `ActualConnection::new()` - `addr.to_socket_addrs()`:
526            // https://doc.rust-lang.org/src/std/net/addr.rs.html#963
527            // https://doc.rust-lang.org/src/std/net/parser.rs.html#158
528            // IpAddr string with brackets can ONLY parse to SocketAddrV6:
529            // https://doc.rust-lang.org/src/std/net/parser.rs.html#255
530            // But if we call Ipv6Addr.to_string directly, it follows rfc5952 without brackets:
531            // https://doc.rust-lang.org/src/std/net/ip.rs.html#1755
532            let host_str = match host {
533                url::Host::Domain(path) => path.to_string(),
534                url::Host::Ipv4(v4) => v4.to_string(),
535                url::Host::Ipv6(v6) => v6.to_string(),
536            };
537
538            if is_wildcard_address(&host_str) {
539                return Err(RedisError::from((
540                    ErrorKind::InvalidClientConfig,
541                    "Cannot connect to a wildcard address (0.0.0.0 or ::)",
542                )));
543            }
544            host_str
545        }
546        None => fail!((ErrorKind::InvalidClientConfig, "Missing hostname")),
547    };
548    let port = url.port().unwrap_or(DEFAULT_PORT);
549    let addr = if url.scheme() == "rediss" || url.scheme() == "valkeys" {
550        #[cfg(any(feature = "tls-native-tls", feature = "tls-rustls"))]
551        {
552            match url.fragment() {
553                Some("insecure") => ConnectionAddr::TcpTls {
554                    host,
555                    port,
556                    insecure: true,
557                    tls_params: None,
558                },
559                Some(_) => fail!((
560                    ErrorKind::InvalidClientConfig,
561                    "only #insecure is supported as URL fragment"
562                )),
563                _ => ConnectionAddr::TcpTls {
564                    host,
565                    port,
566                    insecure: false,
567                    tls_params: None,
568                },
569            }
570        }
571
572        #[cfg(not(any(feature = "tls-native-tls", feature = "tls-rustls")))]
573        fail!((
574            ErrorKind::InvalidClientConfig,
575            "can't connect with TLS, the feature is not enabled"
576        ));
577    } else {
578        ConnectionAddr::Tcp(host, port)
579    };
580    let query: HashMap<_, _> = url.query_pairs().collect();
581    Ok(ConnectionInfo {
582        addr,
583        redis: RedisConnectionInfo {
584            db: match url.path().trim_matches('/') {
585                "" => 0,
586                path => path.parse::<i64>().map_err(|_| -> RedisError {
587                    (ErrorKind::InvalidClientConfig, "Invalid database number").into()
588                })?,
589            },
590            username: if url.username().is_empty() {
591                None
592            } else {
593                match percent_encoding::percent_decode(url.username().as_bytes()).decode_utf8() {
594                    Ok(decoded) => Some(decoded.into()),
595                    Err(_) => fail!((
596                        ErrorKind::InvalidClientConfig,
597                        "Username is not valid UTF-8 string"
598                    )),
599                }
600            },
601            password: match url.password() {
602                Some(pw) => match percent_encoding::percent_decode(pw.as_bytes()).decode_utf8() {
603                    Ok(decoded) => Some(decoded.into()),
604                    Err(_) => fail!((
605                        ErrorKind::InvalidClientConfig,
606                        "Password is not valid UTF-8 string"
607                    )),
608                },
609                None => None,
610            },
611            protocol: parse_protocol(&query)?,
612            skip_set_lib_name: false,
613            lib_name: None,
614            lib_ver: None,
615        },
616        tcp_settings: TcpSettings::default(),
617    })
618}
619
620#[cfg(unix)]
621fn url_to_unix_connection_info(url: url::Url) -> RedisResult<ConnectionInfo> {
622    let query: HashMap<_, _> = url.query_pairs().collect();
623    Ok(ConnectionInfo {
624        addr: ConnectionAddr::Unix(url.to_file_path().map_err(|_| -> RedisError {
625            (ErrorKind::InvalidClientConfig, "Missing path").into()
626        })?),
627        redis: RedisConnectionInfo {
628            db: match query.get("db") {
629                Some(db) => db.parse::<i64>().map_err(|_| -> RedisError {
630                    (ErrorKind::InvalidClientConfig, "Invalid database number").into()
631                })?,
632
633                None => 0,
634            },
635            username: query.get("user").map(|username| username.as_ref().into()),
636            password: query.get("pass").map(|password| password.as_ref().into()),
637            protocol: parse_protocol(&query)?,
638            skip_set_lib_name: false,
639            lib_name: None,
640            lib_ver: None,
641        },
642        tcp_settings: TcpSettings::default(),
643    })
644}
645
646#[cfg(not(unix))]
647fn url_to_unix_connection_info(_: url::Url) -> RedisResult<ConnectionInfo> {
648    fail!((
649        ErrorKind::InvalidClientConfig,
650        "Unix sockets are not available on this platform."
651    ));
652}
653
654impl IntoConnectionInfo for url::Url {
655    fn into_connection_info(self) -> RedisResult<ConnectionInfo> {
656        match self.scheme() {
657            "redis" | "rediss" | "valkey" | "valkeys" => url_to_tcp_connection_info(self),
658            "unix" | "redis+unix" | "valkey+unix" => url_to_unix_connection_info(self),
659            _ => fail!((
660                ErrorKind::InvalidClientConfig,
661                "URL provided is not a redis URL"
662            )),
663        }
664    }
665}
666
667struct TcpConnection {
668    reader: TcpStream,
669    open: bool,
670}
671
672#[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
673struct TcpNativeTlsConnection {
674    reader: TlsStream<TcpStream>,
675    open: bool,
676}
677
678#[cfg(feature = "tls-rustls")]
679struct TcpRustlsConnection {
680    reader: StreamOwned<rustls::ClientConnection, TcpStream>,
681    open: bool,
682}
683
684#[cfg(unix)]
685struct UnixConnection {
686    sock: UnixStream,
687    open: bool,
688}
689
690enum ActualConnection {
691    Tcp(TcpConnection),
692    #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
693    TcpNativeTls(Box<TcpNativeTlsConnection>),
694    #[cfg(feature = "tls-rustls")]
695    TcpRustls(Box<TcpRustlsConnection>),
696    #[cfg(unix)]
697    Unix(UnixConnection),
698}
699
700#[cfg(feature = "tls-rustls-insecure")]
701struct NoCertificateVerification {
702    supported: rustls::crypto::WebPkiSupportedAlgorithms,
703}
704
705#[cfg(feature = "tls-rustls-insecure")]
706impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification {
707    fn verify_server_cert(
708        &self,
709        _end_entity: &rustls::pki_types::CertificateDer<'_>,
710        _intermediates: &[rustls::pki_types::CertificateDer<'_>],
711        _server_name: &rustls::pki_types::ServerName<'_>,
712        _ocsp_response: &[u8],
713        _now: rustls::pki_types::UnixTime,
714    ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
715        Ok(rustls::client::danger::ServerCertVerified::assertion())
716    }
717
718    fn verify_tls12_signature(
719        &self,
720        _message: &[u8],
721        _cert: &rustls::pki_types::CertificateDer<'_>,
722        _dss: &rustls::DigitallySignedStruct,
723    ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
724        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
725    }
726
727    fn verify_tls13_signature(
728        &self,
729        _message: &[u8],
730        _cert: &rustls::pki_types::CertificateDer<'_>,
731        _dss: &rustls::DigitallySignedStruct,
732    ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
733        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
734    }
735
736    fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
737        self.supported.supported_schemes()
738    }
739}
740
741#[cfg(feature = "tls-rustls-insecure")]
742impl fmt::Debug for NoCertificateVerification {
743    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
744        f.debug_struct("NoCertificateVerification").finish()
745    }
746}
747
748/// Insecure `ServerCertVerifier` for rustls that implements `danger_accept_invalid_hostnames`.
749#[cfg(feature = "tls-rustls-insecure")]
750#[derive(Debug)]
751struct AcceptInvalidHostnamesCertVerifier {
752    inner: Arc<rustls::client::WebPkiServerVerifier>,
753}
754
755#[cfg(feature = "tls-rustls-insecure")]
756fn is_hostname_error(err: &rustls::Error) -> bool {
757    matches!(
758        err,
759        rustls::Error::InvalidCertificate(
760            rustls::CertificateError::NotValidForName
761                | rustls::CertificateError::NotValidForNameContext { .. }
762        )
763    )
764}
765
766#[cfg(feature = "tls-rustls-insecure")]
767impl rustls::client::danger::ServerCertVerifier for AcceptInvalidHostnamesCertVerifier {
768    fn verify_server_cert(
769        &self,
770        end_entity: &rustls::pki_types::CertificateDer<'_>,
771        intermediates: &[rustls::pki_types::CertificateDer<'_>],
772        server_name: &rustls::pki_types::ServerName<'_>,
773        ocsp_response: &[u8],
774        now: rustls::pki_types::UnixTime,
775    ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
776        self.inner
777            .verify_server_cert(end_entity, intermediates, server_name, ocsp_response, now)
778            .or_else(|err| {
779                if is_hostname_error(&err) {
780                    Ok(rustls::client::danger::ServerCertVerified::assertion())
781                } else {
782                    Err(err)
783                }
784            })
785    }
786
787    fn verify_tls12_signature(
788        &self,
789        message: &[u8],
790        cert: &rustls::pki_types::CertificateDer<'_>,
791        dss: &rustls::DigitallySignedStruct,
792    ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
793        self.inner
794            .verify_tls12_signature(message, cert, dss)
795            .or_else(|err| {
796                if is_hostname_error(&err) {
797                    Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
798                } else {
799                    Err(err)
800                }
801            })
802    }
803
804    fn verify_tls13_signature(
805        &self,
806        message: &[u8],
807        cert: &rustls::pki_types::CertificateDer<'_>,
808        dss: &rustls::DigitallySignedStruct,
809    ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
810        self.inner
811            .verify_tls13_signature(message, cert, dss)
812            .or_else(|err| {
813                if is_hostname_error(&err) {
814                    Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
815                } else {
816                    Err(err)
817                }
818            })
819    }
820
821    fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
822        self.inner.supported_verify_schemes()
823    }
824}
825
826/// Represents a stateful redis TCP connection.
827pub struct Connection {
828    con: ActualConnection,
829    parser: Parser,
830    db: i64,
831
832    /// Flag indicating whether the connection was left in the PubSub state after dropping `PubSub`.
833    ///
834    /// This flag is checked when attempting to send a command, and if it's raised, we attempt to
835    /// exit the pubsub state before executing the new request.
836    pubsub: bool,
837
838    // Field indicating which protocol to use for server communications.
839    protocol: ProtocolVersion,
840
841    /// This is used to manage Push messages in RESP3 mode.
842    push_sender: Option<SyncPushSender>,
843
844    /// The number of messages that are expected to be returned from the server,
845    /// but the user no longer waits for - answers for requests that already returned a transient error.
846    messages_to_skip: usize,
847}
848
849/// Represents a RESP2 pubsub connection.
850///
851/// If you're using a DB that supports RESP3, consider using a regular connection and setting a push sender it using [Connection::set_push_sender].
852pub struct PubSub<'a> {
853    con: &'a mut Connection,
854    waiting_messages: VecDeque<Msg>,
855}
856
857/// Represents a pubsub message.
858#[derive(Debug, Clone)]
859pub struct Msg {
860    payload: Value,
861    channel: Value,
862    pattern: Option<Value>,
863}
864
865impl ActualConnection {
866    pub fn new(
867        addr: &ConnectionAddr,
868        timeout: Option<Duration>,
869        tcp_settings: &TcpSettings,
870    ) -> RedisResult<ActualConnection> {
871        Ok(match *addr {
872            ConnectionAddr::Tcp(ref host, ref port) => {
873                if is_wildcard_address(host) {
874                    fail!((
875                        ErrorKind::InvalidClientConfig,
876                        "Cannot connect to a wildcard address (0.0.0.0 or ::)"
877                    ));
878                }
879                let addr = (host.as_str(), *port);
880                let tcp = match timeout {
881                    None => connect_tcp(addr, tcp_settings)?,
882                    Some(timeout) => {
883                        let mut tcp = None;
884                        let mut last_error = None;
885                        for addr in addr.to_socket_addrs()? {
886                            match connect_tcp_timeout(&addr, timeout, tcp_settings) {
887                                Ok(l) => {
888                                    tcp = Some(l);
889                                    break;
890                                }
891                                Err(e) => {
892                                    last_error = Some(e);
893                                }
894                            };
895                        }
896                        match (tcp, last_error) {
897                            (Some(tcp), _) => tcp,
898                            (None, Some(e)) => {
899                                fail!(e);
900                            }
901                            (None, None) => {
902                                fail!((
903                                    ErrorKind::InvalidClientConfig,
904                                    "could not resolve to any addresses"
905                                ));
906                            }
907                        }
908                    }
909                };
910                ActualConnection::Tcp(TcpConnection {
911                    reader: tcp,
912                    open: true,
913                })
914            }
915            #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
916            ConnectionAddr::TcpTls {
917                ref host,
918                port,
919                insecure,
920                ref tls_params,
921            } => {
922                let tls_connector = if insecure {
923                    TlsConnector::builder()
924                        .danger_accept_invalid_certs(true)
925                        .danger_accept_invalid_hostnames(true)
926                        .use_sni(false)
927                        .build()?
928                } else if let Some(params) = tls_params {
929                    TlsConnector::builder()
930                        .danger_accept_invalid_hostnames(params.danger_accept_invalid_hostnames)
931                        .build()?
932                } else {
933                    TlsConnector::new()?
934                };
935                let addr = (host.as_str(), port);
936                let tls = match timeout {
937                    None => {
938                        let tcp = connect_tcp(addr, tcp_settings)?;
939                        match tls_connector.connect(host, tcp) {
940                            Ok(res) => res,
941                            Err(e) => {
942                                fail!((ErrorKind::Io, "SSL Handshake error", e.to_string()));
943                            }
944                        }
945                    }
946                    Some(timeout) => {
947                        let mut tcp = None;
948                        let mut last_error = None;
949                        for addr in (host.as_str(), port).to_socket_addrs()? {
950                            match connect_tcp_timeout(&addr, timeout, tcp_settings) {
951                                Ok(l) => {
952                                    tcp = Some(l);
953                                    break;
954                                }
955                                Err(e) => {
956                                    last_error = Some(e);
957                                }
958                            };
959                        }
960                        match (tcp, last_error) {
961                            (Some(tcp), _) => tls_connector.connect(host, tcp).unwrap(),
962                            (None, Some(e)) => {
963                                fail!(e);
964                            }
965                            (None, None) => {
966                                fail!((
967                                    ErrorKind::InvalidClientConfig,
968                                    "could not resolve to any addresses"
969                                ));
970                            }
971                        }
972                    }
973                };
974                ActualConnection::TcpNativeTls(Box::new(TcpNativeTlsConnection {
975                    reader: tls,
976                    open: true,
977                }))
978            }
979            #[cfg(feature = "tls-rustls")]
980            ConnectionAddr::TcpTls {
981                ref host,
982                port,
983                insecure,
984                ref tls_params,
985            } => {
986                let host: &str = host;
987                let config = create_rustls_config(insecure, tls_params.clone())?;
988                let conn = rustls::ClientConnection::new(
989                    Arc::new(config),
990                    rustls::pki_types::ServerName::try_from(host)?.to_owned(),
991                )?;
992                let reader = match timeout {
993                    None => {
994                        let tcp = connect_tcp((host, port), tcp_settings)?;
995                        StreamOwned::new(conn, tcp)
996                    }
997                    Some(timeout) => {
998                        let mut tcp = None;
999                        let mut last_error = None;
1000                        for addr in (host, port).to_socket_addrs()? {
1001                            match connect_tcp_timeout(&addr, timeout, tcp_settings) {
1002                                Ok(l) => {
1003                                    tcp = Some(l);
1004                                    break;
1005                                }
1006                                Err(e) => {
1007                                    last_error = Some(e);
1008                                }
1009                            };
1010                        }
1011                        match (tcp, last_error) {
1012                            (Some(tcp), _) => StreamOwned::new(conn, tcp),
1013                            (None, Some(e)) => {
1014                                fail!(e);
1015                            }
1016                            (None, None) => {
1017                                fail!((
1018                                    ErrorKind::InvalidClientConfig,
1019                                    "could not resolve to any addresses"
1020                                ));
1021                            }
1022                        }
1023                    }
1024                };
1025
1026                ActualConnection::TcpRustls(Box::new(TcpRustlsConnection { reader, open: true }))
1027            }
1028            #[cfg(not(any(feature = "tls-native-tls", feature = "tls-rustls")))]
1029            ConnectionAddr::TcpTls { .. } => {
1030                fail!((
1031                    ErrorKind::InvalidClientConfig,
1032                    "Cannot connect to TCP with TLS without the tls feature"
1033                ));
1034            }
1035            #[cfg(unix)]
1036            ConnectionAddr::Unix(ref path) => ActualConnection::Unix(UnixConnection {
1037                sock: UnixStream::connect(path)?,
1038                open: true,
1039            }),
1040            #[cfg(not(unix))]
1041            ConnectionAddr::Unix(ref _path) => {
1042                fail!((
1043                    ErrorKind::InvalidClientConfig,
1044                    "Cannot connect to unix sockets \
1045                     on this platform"
1046                ));
1047            }
1048        })
1049    }
1050
1051    pub fn send_bytes(&mut self, bytes: &[u8]) -> RedisResult<Value> {
1052        match *self {
1053            ActualConnection::Tcp(ref mut connection) => {
1054                let res = connection.reader.write_all(bytes).map_err(RedisError::from);
1055                match res {
1056                    Err(e) => {
1057                        if e.is_unrecoverable_error() {
1058                            connection.open = false;
1059                        }
1060                        Err(e)
1061                    }
1062                    Ok(_) => Ok(Value::Okay),
1063                }
1064            }
1065            #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
1066            ActualConnection::TcpNativeTls(ref mut connection) => {
1067                let res = connection.reader.write_all(bytes).map_err(RedisError::from);
1068                match res {
1069                    Err(e) => {
1070                        if e.is_unrecoverable_error() {
1071                            connection.open = false;
1072                        }
1073                        Err(e)
1074                    }
1075                    Ok(_) => Ok(Value::Okay),
1076                }
1077            }
1078            #[cfg(feature = "tls-rustls")]
1079            ActualConnection::TcpRustls(ref mut connection) => {
1080                let res = connection.reader.write_all(bytes).map_err(RedisError::from);
1081                match res {
1082                    Err(e) => {
1083                        if e.is_unrecoverable_error() {
1084                            connection.open = false;
1085                        }
1086                        Err(e)
1087                    }
1088                    Ok(_) => Ok(Value::Okay),
1089                }
1090            }
1091            #[cfg(unix)]
1092            ActualConnection::Unix(ref mut connection) => {
1093                let result = connection.sock.write_all(bytes).map_err(RedisError::from);
1094                match result {
1095                    Err(e) => {
1096                        if e.is_unrecoverable_error() {
1097                            connection.open = false;
1098                        }
1099                        Err(e)
1100                    }
1101                    Ok(_) => Ok(Value::Okay),
1102                }
1103            }
1104        }
1105    }
1106
1107    pub fn set_write_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
1108        match *self {
1109            ActualConnection::Tcp(TcpConnection { ref reader, .. }) => {
1110                reader.set_write_timeout(dur)?;
1111            }
1112            #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
1113            ActualConnection::TcpNativeTls(ref boxed_tls_connection) => {
1114                let reader = &(boxed_tls_connection.reader);
1115                reader.get_ref().set_write_timeout(dur)?;
1116            }
1117            #[cfg(feature = "tls-rustls")]
1118            ActualConnection::TcpRustls(ref boxed_tls_connection) => {
1119                let reader = &(boxed_tls_connection.reader);
1120                reader.get_ref().set_write_timeout(dur)?;
1121            }
1122            #[cfg(unix)]
1123            ActualConnection::Unix(UnixConnection { ref sock, .. }) => {
1124                sock.set_write_timeout(dur)?;
1125            }
1126        }
1127        Ok(())
1128    }
1129
1130    pub fn set_read_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
1131        match *self {
1132            ActualConnection::Tcp(TcpConnection { ref reader, .. }) => {
1133                reader.set_read_timeout(dur)?;
1134            }
1135            #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
1136            ActualConnection::TcpNativeTls(ref boxed_tls_connection) => {
1137                let reader = &(boxed_tls_connection.reader);
1138                reader.get_ref().set_read_timeout(dur)?;
1139            }
1140            #[cfg(feature = "tls-rustls")]
1141            ActualConnection::TcpRustls(ref boxed_tls_connection) => {
1142                let reader = &(boxed_tls_connection.reader);
1143                reader.get_ref().set_read_timeout(dur)?;
1144            }
1145            #[cfg(unix)]
1146            ActualConnection::Unix(UnixConnection { ref sock, .. }) => {
1147                sock.set_read_timeout(dur)?;
1148            }
1149        }
1150        Ok(())
1151    }
1152
1153    pub fn is_open(&self) -> bool {
1154        match *self {
1155            ActualConnection::Tcp(TcpConnection { open, .. }) => open,
1156            #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
1157            ActualConnection::TcpNativeTls(ref boxed_tls_connection) => boxed_tls_connection.open,
1158            #[cfg(feature = "tls-rustls")]
1159            ActualConnection::TcpRustls(ref boxed_tls_connection) => boxed_tls_connection.open,
1160            #[cfg(unix)]
1161            ActualConnection::Unix(UnixConnection { open, .. }) => open,
1162        }
1163    }
1164}
1165
1166#[cfg(feature = "tls-rustls")]
1167pub(crate) fn create_rustls_config(
1168    insecure: bool,
1169    tls_params: Option<TlsConnParams>,
1170) -> RedisResult<rustls::ClientConfig> {
1171    #[allow(unused_mut)]
1172    let mut root_store = RootCertStore::empty();
1173    #[cfg(feature = "tls-rustls-webpki-roots")]
1174    root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
1175    #[cfg(all(
1176        feature = "tls-rustls",
1177        not(feature = "tls-native-tls"),
1178        not(feature = "tls-rustls-webpki-roots")
1179    ))]
1180    {
1181        let mut certificate_result = load_native_certs();
1182        if let Some(error) = certificate_result.errors.pop() {
1183            return Err(error.into());
1184        }
1185        for cert in certificate_result.certs {
1186            root_store.add(cert)?;
1187        }
1188    }
1189
1190    let config = rustls::ClientConfig::builder();
1191    let config = if let Some(tls_params) = tls_params {
1192        let root_cert_store = tls_params.root_cert_store.unwrap_or(root_store);
1193        let config_builder = config.with_root_certificates(root_cert_store.clone());
1194
1195        let config_builder = if let Some(ClientTlsParams {
1196            client_cert_chain: client_cert,
1197            client_key,
1198        }) = tls_params.client_tls_params
1199        {
1200            config_builder
1201                .with_client_auth_cert(client_cert, client_key)
1202                .map_err(|err| {
1203                    RedisError::from((
1204                        ErrorKind::InvalidClientConfig,
1205                        "Unable to build client with TLS parameters provided.",
1206                        err.to_string(),
1207                    ))
1208                })?
1209        } else {
1210            config_builder.with_no_client_auth()
1211        };
1212
1213        // Implement `danger_accept_invalid_hostnames`.
1214        //
1215        // The strange cfg here is to handle a specific unusual combination of features: if
1216        // `tls-native-tls` and `tls-rustls` are enabled, but `tls-rustls-insecure` is not, and the
1217        // application tries to use the danger flag.
1218        #[cfg(any(feature = "tls-rustls-insecure", feature = "tls-native-tls"))]
1219        let config_builder = if !insecure && tls_params.danger_accept_invalid_hostnames {
1220            #[cfg(not(feature = "tls-rustls-insecure"))]
1221            {
1222                // This code should not enable an insecure mode if the `insecure` feature is not
1223                // set, but it shouldn't silently ignore the flag either. So return an error.
1224                fail!((
1225                    ErrorKind::InvalidClientConfig,
1226                    "Cannot create insecure client via danger_accept_invalid_hostnames without tls-rustls-insecure feature"
1227                ));
1228            }
1229
1230            #[cfg(feature = "tls-rustls-insecure")]
1231            {
1232                let mut config = config_builder;
1233                config.dangerous().set_certificate_verifier(Arc::new(
1234                    AcceptInvalidHostnamesCertVerifier {
1235                        inner: rustls::client::WebPkiServerVerifier::builder(Arc::new(
1236                            root_cert_store,
1237                        ))
1238                        .build()
1239                        .map_err(|err| rustls::Error::from(rustls::OtherError(Arc::new(err))))?,
1240                    },
1241                ));
1242                config
1243            }
1244        } else {
1245            config_builder
1246        };
1247
1248        config_builder
1249    } else {
1250        config
1251            .with_root_certificates(root_store)
1252            .with_no_client_auth()
1253    };
1254
1255    match (insecure, cfg!(feature = "tls-rustls-insecure")) {
1256        #[cfg(feature = "tls-rustls-insecure")]
1257        (true, true) => {
1258            let mut config = config;
1259            config.enable_sni = false;
1260            let Some(crypto_provider) = rustls::crypto::CryptoProvider::get_default() else {
1261                return Err(RedisError::from((
1262                    ErrorKind::InvalidClientConfig,
1263                    "No crypto provider available for rustls",
1264                )));
1265            };
1266            config
1267                .dangerous()
1268                .set_certificate_verifier(Arc::new(NoCertificateVerification {
1269                    supported: crypto_provider.signature_verification_algorithms,
1270                }));
1271
1272            Ok(config)
1273        }
1274        (true, false) => {
1275            fail!((
1276                ErrorKind::InvalidClientConfig,
1277                "Cannot create insecure client without tls-rustls-insecure feature"
1278            ));
1279        }
1280        _ => Ok(config),
1281    }
1282}
1283
1284pub(crate) fn authenticate_cmd(username: Option<&str>, password: &str) -> Cmd {
1285    let mut command = cmd("AUTH");
1286
1287    if let Some(username) = &username {
1288        command.arg(username);
1289    }
1290
1291    command.arg(password);
1292    command
1293}
1294
1295pub fn connect(
1296    connection_info: &ConnectionInfo,
1297    timeout: Option<Duration>,
1298) -> RedisResult<Connection> {
1299    let start = Instant::now();
1300    let con: ActualConnection = ActualConnection::new(
1301        &connection_info.addr,
1302        timeout,
1303        &connection_info.tcp_settings,
1304    )?;
1305
1306    // we temporarily set the timeout, and will remove it after finishing setup.
1307    let remaining_timeout = timeout.and_then(|timeout| timeout.checked_sub(start.elapsed()));
1308    // TLS could run logic that doesn't contain a timeout, and should fail if it takes too long.
1309    if timeout.is_some() && remaining_timeout.is_none() {
1310        return Err(RedisError::from(std::io::Error::new(
1311            std::io::ErrorKind::TimedOut,
1312            "Connection timed out",
1313        )));
1314    }
1315    con.set_read_timeout(remaining_timeout)?;
1316    con.set_write_timeout(remaining_timeout)?;
1317
1318    let con = setup_connection(
1319        con,
1320        &connection_info.redis,
1321        #[cfg(feature = "cache-aio")]
1322        None,
1323    )?;
1324
1325    // remove the temporary timeout.
1326    con.set_read_timeout(None)?;
1327    con.set_write_timeout(None)?;
1328
1329    Ok(con)
1330}
1331
1332pub(crate) struct ConnectionSetupComponents {
1333    resp3_auth_cmd_idx: Option<usize>,
1334    resp2_auth_cmd_idx: Option<usize>,
1335    select_cmd_idx: Option<usize>,
1336    #[cfg(feature = "cache-aio")]
1337    cache_cmd_idx: Option<usize>,
1338}
1339
1340pub(crate) fn connection_setup_pipeline(
1341    connection_info: &RedisConnectionInfo,
1342    check_username: bool,
1343    #[cfg(feature = "cache-aio")] cache_config: Option<crate::caching::CacheConfig>,
1344) -> (crate::Pipeline, ConnectionSetupComponents) {
1345    let mut pipeline = pipe();
1346    let (authenticate_with_resp3_cmd_index, authenticate_with_resp2_cmd_index) =
1347        if connection_info.protocol.supports_resp3() {
1348            pipeline.add_command(resp3_hello(connection_info));
1349            (Some(0), None)
1350        } else if let Some(password) = connection_info.password.as_ref() {
1351            pipeline.add_command(authenticate_cmd(
1352                check_username.then(|| connection_info.username()).flatten(),
1353                password,
1354            ));
1355            (None, Some(0))
1356        } else {
1357            (None, None)
1358        };
1359
1360    let select_db_cmd_index = (connection_info.db != 0)
1361        .then(|| pipeline.len())
1362        .inspect(|_| {
1363            pipeline.cmd("SELECT").arg(connection_info.db);
1364        });
1365
1366    #[cfg(feature = "cache-aio")]
1367    let cache_cmd_index = cache_config.map(|cache_config| {
1368        pipeline.cmd("CLIENT").arg("TRACKING").arg("ON");
1369        match cache_config.mode {
1370            crate::caching::CacheMode::All => {}
1371            crate::caching::CacheMode::OptIn => {
1372                pipeline.arg("OPTIN");
1373            }
1374        }
1375        pipeline.len() - 1
1376    });
1377
1378    // result is ignored, as per the command's instructions.
1379    // https://redis.io/commands/client-setinfo/
1380    if !connection_info.skip_set_lib_name {
1381        pipeline
1382            .cmd("CLIENT")
1383            .arg("SETINFO")
1384            .arg("LIB-NAME")
1385            .arg(
1386                connection_info
1387                    .lib_name
1388                    .as_ref()
1389                    .map_or(DEFAULT_CLIENT_SETINFO_LIB_NAME, ArcStr::as_str),
1390            )
1391            .ignore();
1392        pipeline
1393            .cmd("CLIENT")
1394            .arg("SETINFO")
1395            .arg("LIB-VER")
1396            .arg(
1397                connection_info
1398                    .lib_ver
1399                    .as_ref()
1400                    .map_or(DEFAULT_CLIENT_SETINFO_LIB_VER, ArcStr::as_str),
1401            )
1402            .ignore();
1403    }
1404
1405    (
1406        pipeline,
1407        ConnectionSetupComponents {
1408            resp3_auth_cmd_idx: authenticate_with_resp3_cmd_index,
1409            resp2_auth_cmd_idx: authenticate_with_resp2_cmd_index,
1410            select_cmd_idx: select_db_cmd_index,
1411            #[cfg(feature = "cache-aio")]
1412            cache_cmd_idx: cache_cmd_index,
1413        },
1414    )
1415}
1416
1417fn check_resp3_auth(result: &Value) -> RedisResult<()> {
1418    if let Value::ServerError(err) = result {
1419        return Err(get_resp3_hello_command_error(err.clone().into()));
1420    }
1421    Ok(())
1422}
1423
1424#[derive(PartialEq)]
1425pub(crate) enum AuthResult {
1426    Succeeded,
1427    ShouldRetryWithoutUsername,
1428}
1429
1430fn check_resp2_auth(result: &Value) -> RedisResult<AuthResult> {
1431    let err = match result {
1432        Value::Okay => {
1433            return Ok(AuthResult::Succeeded);
1434        }
1435        Value::ServerError(err) => err,
1436        _ => {
1437            return Err((
1438                ServerErrorKind::ResponseError.into(),
1439                "Redis server refused to authenticate, returns Ok() != Value::Okay",
1440            )
1441                .into());
1442        }
1443    };
1444
1445    let err_msg = err.details().ok_or((
1446        ErrorKind::AuthenticationFailed,
1447        "Password authentication failed",
1448    ))?;
1449    if !err_msg.contains("wrong number of arguments for 'auth' command") {
1450        return Err((
1451            ErrorKind::AuthenticationFailed,
1452            "Password authentication failed",
1453        )
1454            .into());
1455    }
1456    Ok(AuthResult::ShouldRetryWithoutUsername)
1457}
1458
1459fn check_db_select(value: &Value) -> RedisResult<()> {
1460    let Value::ServerError(err) = value else {
1461        return Ok(());
1462    };
1463
1464    match err.details() {
1465        Some(err_msg) => Err((
1466            ServerErrorKind::ResponseError.into(),
1467            "Redis server refused to switch database",
1468            err_msg.to_string(),
1469        )
1470            .into()),
1471        None => Err((
1472            ServerErrorKind::ResponseError.into(),
1473            "Redis server refused to switch database",
1474        )
1475            .into()),
1476    }
1477}
1478
1479#[cfg(feature = "cache-aio")]
1480fn check_caching(result: &Value) -> RedisResult<()> {
1481    match result {
1482        Value::Okay => Ok(()),
1483        _ => Err((
1484            ServerErrorKind::ResponseError.into(),
1485            "Client-side caching returned unknown response",
1486            format!("{result:?}"),
1487        )
1488            .into()),
1489    }
1490}
1491
1492pub(crate) fn check_connection_setup(
1493    results: Vec<Value>,
1494    ConnectionSetupComponents {
1495        resp3_auth_cmd_idx,
1496        resp2_auth_cmd_idx,
1497        select_cmd_idx,
1498        #[cfg(feature = "cache-aio")]
1499        cache_cmd_idx,
1500    }: ConnectionSetupComponents,
1501) -> RedisResult<AuthResult> {
1502    // can't have both values set
1503    assert!(!(resp2_auth_cmd_idx.is_some() && resp3_auth_cmd_idx.is_some()));
1504
1505    if let Some(index) = resp3_auth_cmd_idx {
1506        let Some(value) = results.get(index) else {
1507            return Err((ErrorKind::Client, "Missing RESP3 auth response").into());
1508        };
1509        check_resp3_auth(value)?;
1510    } else if let Some(index) = resp2_auth_cmd_idx {
1511        let Some(value) = results.get(index) else {
1512            return Err((ErrorKind::Client, "Missing RESP2 auth response").into());
1513        };
1514        if check_resp2_auth(value)? == AuthResult::ShouldRetryWithoutUsername {
1515            return Ok(AuthResult::ShouldRetryWithoutUsername);
1516        }
1517    }
1518
1519    if let Some(index) = select_cmd_idx {
1520        let Some(value) = results.get(index) else {
1521            return Err((ErrorKind::Client, "Missing SELECT DB response").into());
1522        };
1523        check_db_select(value)?;
1524    }
1525
1526    #[cfg(feature = "cache-aio")]
1527    if let Some(index) = cache_cmd_idx {
1528        let Some(value) = results.get(index) else {
1529            return Err((ErrorKind::Client, "Missing Caching response").into());
1530        };
1531        check_caching(value)?;
1532    }
1533
1534    Ok(AuthResult::Succeeded)
1535}
1536
1537fn execute_connection_pipeline(
1538    rv: &mut Connection,
1539    (pipeline, instructions): (crate::Pipeline, ConnectionSetupComponents),
1540) -> RedisResult<AuthResult> {
1541    if pipeline.is_empty() {
1542        return Ok(AuthResult::Succeeded);
1543    }
1544    let results = rv.req_packed_commands(&pipeline.get_packed_pipeline(), 0, pipeline.len())?;
1545
1546    check_connection_setup(results, instructions)
1547}
1548
1549fn setup_connection(
1550    con: ActualConnection,
1551    connection_info: &RedisConnectionInfo,
1552    #[cfg(feature = "cache-aio")] cache_config: Option<crate::caching::CacheConfig>,
1553) -> RedisResult<Connection> {
1554    let mut rv = Connection {
1555        con,
1556        parser: Parser::new(),
1557        db: connection_info.db,
1558        pubsub: false,
1559        protocol: connection_info.protocol,
1560        push_sender: None,
1561        messages_to_skip: 0,
1562    };
1563
1564    if execute_connection_pipeline(
1565        &mut rv,
1566        connection_setup_pipeline(
1567            connection_info,
1568            true,
1569            #[cfg(feature = "cache-aio")]
1570            cache_config,
1571        ),
1572    )? == AuthResult::ShouldRetryWithoutUsername
1573    {
1574        execute_connection_pipeline(
1575            &mut rv,
1576            connection_setup_pipeline(
1577                connection_info,
1578                false,
1579                #[cfg(feature = "cache-aio")]
1580                cache_config,
1581            ),
1582        )?;
1583    }
1584
1585    Ok(rv)
1586}
1587
1588/// Implements the "stateless" part of the connection interface that is used by the
1589/// different objects in redis-rs.
1590///
1591/// Primarily it obviously applies to `Connection` object but also some other objects
1592///  implement the interface (for instance whole clients or certain redis results).
1593///
1594/// Generally clients and connections (as well as redis results of those) implement
1595/// this trait.  Actual connections provide more functionality which can be used
1596/// to implement things like `PubSub` but they also can modify the intrinsic
1597/// state of the TCP connection.  This is not possible with `ConnectionLike`
1598/// implementors because that functionality is not exposed.
1599pub trait ConnectionLike {
1600    /// Sends an already encoded (packed) command into the TCP socket and
1601    /// reads the single response from it.
1602    fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value>;
1603
1604    /// Sends multiple already encoded (packed) command into the TCP socket
1605    /// and reads `count` responses from it.  This is used to implement
1606    /// pipelining.
1607    /// Important - this function is meant for internal usage, since it's
1608    /// easy to pass incorrect `offset` & `count` parameters, which might
1609    /// cause the connection to enter an erroneous state. Users shouldn't
1610    /// call it, instead using the Pipeline::query function.
1611    #[doc(hidden)]
1612    fn req_packed_commands(
1613        &mut self,
1614        cmd: &[u8],
1615        offset: usize,
1616        count: usize,
1617    ) -> RedisResult<Vec<Value>>;
1618
1619    /// Sends a [Cmd] into the TCP socket and reads a single response from it.
1620    fn req_command(&mut self, cmd: &Cmd) -> RedisResult<Value> {
1621        let pcmd = cmd.get_packed_command();
1622        self.req_packed_command(&pcmd)
1623    }
1624
1625    /// Returns the database this connection is bound to.  Note that this
1626    /// information might be unreliable because it's initially cached and
1627    /// also might be incorrect if the connection like object is not
1628    /// actually connected.
1629    fn get_db(&self) -> i64;
1630
1631    /// Does this connection support pipelining?
1632    #[doc(hidden)]
1633    fn supports_pipelining(&self) -> bool {
1634        true
1635    }
1636
1637    /// Check that all connections it has are available (`PING` internally).
1638    fn check_connection(&mut self) -> bool;
1639
1640    /// Returns the connection status.
1641    ///
1642    /// The connection is open until any `read` call received an
1643    /// invalid response from the server (most likely a closed or dropped
1644    /// connection, otherwise a Redis protocol error). When using unix
1645    /// sockets the connection is open until writing a command failed with a
1646    /// `BrokenPipe` error.
1647    fn is_open(&self) -> bool;
1648}
1649
1650/// A connection is an object that represents a single redis connection.  It
1651/// provides basic support for sending encoded commands into a redis connection
1652/// and to read a response from it.  It's bound to a single database and can
1653/// only be created from the client.
1654///
1655/// You generally do not much with this object other than passing it to
1656/// `Cmd` objects.
1657impl Connection {
1658    /// Sends an already encoded (packed) command into the TCP socket and
1659    /// does not read a response.  This is useful for commands like
1660    /// `MONITOR` which yield multiple items.  This needs to be used with
1661    /// care because it changes the state of the connection.
1662    pub fn send_packed_command(&mut self, cmd: &[u8]) -> RedisResult<()> {
1663        self.send_bytes(cmd)?;
1664        Ok(())
1665    }
1666
1667    /// Fetches a single response from the connection.  This is useful
1668    /// if used in combination with `send_packed_command`.
1669    pub fn recv_response(&mut self) -> RedisResult<Value> {
1670        self.read(true)
1671    }
1672
1673    /// Sets the write timeout for the connection.
1674    ///
1675    /// If the provided value is `None`, then `send_packed_command` call will
1676    /// block indefinitely. It is an error to pass the zero `Duration` to this
1677    /// method.
1678    pub fn set_write_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
1679        self.con.set_write_timeout(dur)
1680    }
1681
1682    /// Sets the read timeout for the connection.
1683    ///
1684    /// If the provided value is `None`, then `recv_response` call will
1685    /// block indefinitely. It is an error to pass the zero `Duration` to this
1686    /// method.
1687    pub fn set_read_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
1688        self.con.set_read_timeout(dur)
1689    }
1690
1691    /// Creates a [`PubSub`] instance for this connection.
1692    pub fn as_pubsub(&mut self) -> PubSub<'_> {
1693        // NOTE: The pubsub flag is intentionally not raised at this time since
1694        // running commands within the pubsub state should not try and exit from
1695        // the pubsub state.
1696        PubSub::new(self)
1697    }
1698
1699    fn exit_pubsub(&mut self) -> RedisResult<()> {
1700        let res = self.clear_active_subscriptions();
1701        if res.is_ok() {
1702            self.pubsub = false;
1703        } else {
1704            // Raise the pubsub flag to indicate the connection is "stuck" in that state.
1705            self.pubsub = true;
1706        }
1707
1708        res
1709    }
1710
1711    /// Get the inner connection out of a PubSub
1712    ///
1713    /// Any active subscriptions are unsubscribed. In the event of an error, the connection is
1714    /// dropped.
1715    fn clear_active_subscriptions(&mut self) -> RedisResult<()> {
1716        // Responses to unsubscribe commands return in a 3-tuple with values
1717        // ("unsubscribe" or "punsubscribe", name of subscription removed, count of remaining subs).
1718        // The "count of remaining subs" includes both pattern subscriptions and non pattern
1719        // subscriptions. Thus, to accurately drain all unsubscribe messages received from the
1720        // server, both commands need to be executed at once.
1721        {
1722            // Prepare both unsubscribe commands
1723            let unsubscribe = cmd("UNSUBSCRIBE").get_packed_command();
1724            let punsubscribe = cmd("PUNSUBSCRIBE").get_packed_command();
1725
1726            // Execute commands
1727            self.send_bytes(&unsubscribe)?;
1728            self.send_bytes(&punsubscribe)?;
1729        }
1730
1731        // Receive responses
1732        //
1733        // There will be at minimum two responses - 1 for each of punsubscribe and unsubscribe
1734        // commands. There may be more responses if there are active subscriptions. In this case,
1735        // messages are received until the _subscription count_ in the responses reach zero.
1736        let mut received_unsub = false;
1737        let mut received_punsub = false;
1738
1739        loop {
1740            let resp = self.recv_response()?;
1741
1742            match resp {
1743                Value::Push { kind, data } => {
1744                    if data.len() >= 2 {
1745                        if let Value::Int(num) = data[1] {
1746                            if resp3_is_pub_sub_state_cleared(
1747                                &mut received_unsub,
1748                                &mut received_punsub,
1749                                &kind,
1750                                num as isize,
1751                            ) {
1752                                break;
1753                            }
1754                        }
1755                    }
1756                }
1757                Value::ServerError(err) => {
1758                    // a new error behavior, introduced in valkey 8.
1759                    // https://github.com/valkey-io/valkey/pull/759
1760                    if err.kind() == Some(ServerErrorKind::NoSub) {
1761                        if no_sub_err_is_pub_sub_state_cleared(
1762                            &mut received_unsub,
1763                            &mut received_punsub,
1764                            &err,
1765                        ) {
1766                            break;
1767                        } else {
1768                            continue;
1769                        }
1770                    }
1771
1772                    return Err(err.into());
1773                }
1774                Value::Array(vec) => {
1775                    let res: (Vec<u8>, (), isize) = from_redis_value(Value::Array(vec))?;
1776                    if resp2_is_pub_sub_state_cleared(
1777                        &mut received_unsub,
1778                        &mut received_punsub,
1779                        &res.0,
1780                        res.2,
1781                    ) {
1782                        break;
1783                    }
1784                }
1785                _ => {
1786                    return Err((
1787                        ErrorKind::Client,
1788                        "Unexpected unsubscribe response",
1789                        format!("{resp:?}"),
1790                    )
1791                        .into());
1792                }
1793            }
1794        }
1795
1796        // Finally, the connection is back in its normal state since all subscriptions were
1797        // cancelled *and* all unsubscribe messages were received.
1798        Ok(())
1799    }
1800
1801    fn send_push(&self, push: PushInfo) {
1802        if let Some(sender) = &self.push_sender {
1803            let _ = sender.send(push);
1804        }
1805    }
1806
1807    fn try_send(&self, value: &RedisResult<Value>) {
1808        if let Ok(Value::Push { kind, data }) = value {
1809            self.send_push(PushInfo {
1810                kind: kind.clone(),
1811                data: data.clone(),
1812            });
1813        }
1814    }
1815
1816    fn send_disconnect(&self) {
1817        self.send_push(PushInfo::disconnect())
1818    }
1819
1820    fn close_connection(&mut self) {
1821        // Notify the PushManager that the connection was lost
1822        self.send_disconnect();
1823        match self.con {
1824            ActualConnection::Tcp(ref mut connection) => {
1825                let _ = connection.reader.shutdown(net::Shutdown::Both);
1826                connection.open = false;
1827            }
1828            #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
1829            ActualConnection::TcpNativeTls(ref mut connection) => {
1830                let _ = connection.reader.shutdown();
1831                connection.open = false;
1832            }
1833            #[cfg(feature = "tls-rustls")]
1834            ActualConnection::TcpRustls(ref mut connection) => {
1835                let _ = connection.reader.get_mut().shutdown(net::Shutdown::Both);
1836                connection.open = false;
1837            }
1838            #[cfg(unix)]
1839            ActualConnection::Unix(ref mut connection) => {
1840                let _ = connection.sock.shutdown(net::Shutdown::Both);
1841                connection.open = false;
1842            }
1843        }
1844    }
1845
1846    /// Fetches a single message from the connection. If the message is a response,
1847    /// increment `messages_to_skip` if it wasn't received before a timeout.
1848    fn read(&mut self, is_response: bool) -> RedisResult<Value> {
1849        loop {
1850            let result = match self.con {
1851                ActualConnection::Tcp(TcpConnection { ref mut reader, .. }) => {
1852                    self.parser.parse_value(reader)
1853                }
1854                #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
1855                ActualConnection::TcpNativeTls(ref mut boxed_tls_connection) => {
1856                    let reader = &mut boxed_tls_connection.reader;
1857                    self.parser.parse_value(reader)
1858                }
1859                #[cfg(feature = "tls-rustls")]
1860                ActualConnection::TcpRustls(ref mut boxed_tls_connection) => {
1861                    let reader = &mut boxed_tls_connection.reader;
1862                    self.parser.parse_value(reader)
1863                }
1864                #[cfg(unix)]
1865                ActualConnection::Unix(UnixConnection { ref mut sock, .. }) => {
1866                    self.parser.parse_value(sock)
1867                }
1868            };
1869            self.try_send(&result);
1870
1871            let Err(err) = &result else {
1872                if self.messages_to_skip > 0 {
1873                    self.messages_to_skip -= 1;
1874                    continue;
1875                }
1876                return result;
1877            };
1878            let Some(io_error) = err.as_io_error() else {
1879                if self.messages_to_skip > 0 {
1880                    self.messages_to_skip -= 1;
1881                    continue;
1882                }
1883                return result;
1884            };
1885            // shutdown connection on protocol error
1886            if io_error.kind() == io::ErrorKind::UnexpectedEof {
1887                self.close_connection();
1888            } else if is_response {
1889                self.messages_to_skip += 1;
1890            }
1891
1892            return result;
1893        }
1894    }
1895
1896    /// Sets sender channel for push values.
1897    pub fn set_push_sender(&mut self, sender: SyncPushSender) {
1898        self.push_sender = Some(sender);
1899    }
1900
1901    fn send_bytes(&mut self, bytes: &[u8]) -> RedisResult<Value> {
1902        if bytes.is_empty() {
1903            return Err(RedisError::make_empty_command());
1904        }
1905        let result = self.con.send_bytes(bytes);
1906        if self.protocol.supports_resp3() {
1907            if let Err(e) = &result {
1908                if e.is_connection_dropped() {
1909                    self.send_disconnect();
1910                }
1911            }
1912        }
1913        result
1914    }
1915
1916    /// Subscribes to a new channel(s).
1917    ///
1918    /// This only works if the connection was configured with [ProtocolVersion::RESP3] and [Self::set_push_sender].
1919    pub fn subscribe_resp3<T: ToRedisArgs>(&mut self, channel: T) -> RedisResult<()> {
1920        check_resp3!(self.protocol);
1921        cmd("SUBSCRIBE")
1922            .arg(channel)
1923            .set_no_response(true)
1924            .exec(self)
1925    }
1926
1927    /// Subscribes to new channel(s) with pattern(s).
1928    ///
1929    /// This only works if the connection was configured with [ProtocolVersion::RESP3] and [Self::set_push_sender].
1930    pub fn psubscribe_resp3<T: ToRedisArgs>(&mut self, pchannel: T) -> RedisResult<()> {
1931        check_resp3!(self.protocol);
1932        cmd("PSUBSCRIBE")
1933            .arg(pchannel)
1934            .set_no_response(true)
1935            .exec(self)
1936    }
1937
1938    /// Unsubscribes from a channel(s).
1939    ///
1940    /// This only works if the connection was configured with [ProtocolVersion::RESP3] and [Self::set_push_sender].
1941    pub fn unsubscribe_resp3<T: ToRedisArgs>(&mut self, channel: T) -> RedisResult<()> {
1942        check_resp3!(self.protocol);
1943        cmd("UNSUBSCRIBE")
1944            .arg(channel)
1945            .set_no_response(true)
1946            .exec(self)
1947    }
1948
1949    /// Unsubscribes from channel pattern(s).
1950    ///
1951    /// This only works if the connection was configured with [ProtocolVersion::RESP3] and [Self::set_push_sender].
1952    pub fn punsubscribe_resp3<T: ToRedisArgs>(&mut self, pchannel: T) -> RedisResult<()> {
1953        check_resp3!(self.protocol);
1954        cmd("PUNSUBSCRIBE")
1955            .arg(pchannel)
1956            .set_no_response(true)
1957            .exec(self)
1958    }
1959}
1960
1961impl ConnectionLike for Connection {
1962    /// Sends a [Cmd] into the TCP socket and reads a single response from it.
1963    fn req_command(&mut self, cmd: &Cmd) -> RedisResult<Value> {
1964        let pcmd = cmd.get_packed_command();
1965        if self.pubsub {
1966            self.exit_pubsub()?;
1967        }
1968
1969        self.send_bytes(&pcmd)?;
1970        if cmd.is_no_response() {
1971            return Ok(Value::Nil);
1972        }
1973        loop {
1974            match self.read(true)? {
1975                Value::Push {
1976                    kind: _kind,
1977                    data: _data,
1978                } => continue,
1979                val => return Ok(val),
1980            }
1981        }
1982    }
1983    fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
1984        if self.pubsub {
1985            self.exit_pubsub()?;
1986        }
1987
1988        self.send_bytes(cmd)?;
1989        loop {
1990            match self.read(true)? {
1991                Value::Push {
1992                    kind: _kind,
1993                    data: _data,
1994                } => continue,
1995                val => return Ok(val),
1996            }
1997        }
1998    }
1999
2000    fn req_packed_commands(
2001        &mut self,
2002        cmd: &[u8],
2003        offset: usize,
2004        count: usize,
2005    ) -> RedisResult<Vec<Value>> {
2006        if self.pubsub {
2007            self.exit_pubsub()?;
2008        }
2009        self.send_bytes(cmd)?;
2010        let mut rv = vec![];
2011        let mut first_err = None;
2012        let mut server_errors = vec![];
2013        let mut count = count;
2014        let mut idx = 0;
2015        while idx < (offset + count) {
2016            // When processing a transaction, some responses may be errors.
2017            // We need to keep processing the rest of the responses in that case,
2018            // so bailing early with `?` would not be correct.
2019            // See: https://github.com/redis-rs/redis-rs/issues/436
2020            let response = self.read(true);
2021            match response {
2022                Ok(Value::ServerError(err)) => {
2023                    if idx < offset {
2024                        server_errors.push((idx - 1, err)); // -1, to offset the added MULTI call.
2025                    } else {
2026                        rv.push(Value::ServerError(err));
2027                    }
2028                }
2029                Ok(item) => {
2030                    // RESP3 can insert push data between command replies
2031                    if let Value::Push {
2032                        kind: _kind,
2033                        data: _data,
2034                    } = item
2035                    {
2036                        // if that is the case we have to extend the loop and handle push data
2037                        count += 1;
2038                    } else if idx >= offset {
2039                        rv.push(item);
2040                    }
2041                }
2042                Err(err) => {
2043                    if first_err.is_none() {
2044                        first_err = Some(err);
2045                    }
2046                }
2047            }
2048            idx += 1;
2049        }
2050
2051        if !server_errors.is_empty() {
2052            return Err(RedisError::make_aborted_transaction(server_errors));
2053        }
2054
2055        first_err.map_or(Ok(rv), Err)
2056    }
2057
2058    fn get_db(&self) -> i64 {
2059        self.db
2060    }
2061
2062    fn check_connection(&mut self) -> bool {
2063        cmd("PING").query::<String>(self).is_ok()
2064    }
2065
2066    fn is_open(&self) -> bool {
2067        self.con.is_open()
2068    }
2069}
2070
2071impl<C, T> ConnectionLike for T
2072where
2073    C: ConnectionLike,
2074    T: DerefMut<Target = C>,
2075{
2076    fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
2077        self.deref_mut().req_packed_command(cmd)
2078    }
2079
2080    fn req_packed_commands(
2081        &mut self,
2082        cmd: &[u8],
2083        offset: usize,
2084        count: usize,
2085    ) -> RedisResult<Vec<Value>> {
2086        self.deref_mut().req_packed_commands(cmd, offset, count)
2087    }
2088
2089    fn req_command(&mut self, cmd: &Cmd) -> RedisResult<Value> {
2090        self.deref_mut().req_command(cmd)
2091    }
2092
2093    fn get_db(&self) -> i64 {
2094        self.deref().get_db()
2095    }
2096
2097    fn supports_pipelining(&self) -> bool {
2098        self.deref().supports_pipelining()
2099    }
2100
2101    fn check_connection(&mut self) -> bool {
2102        self.deref_mut().check_connection()
2103    }
2104
2105    fn is_open(&self) -> bool {
2106        self.deref().is_open()
2107    }
2108}
2109
2110/// The pubsub object provides convenient access to the redis pubsub
2111/// system.  Once created you can subscribe and unsubscribe from channels
2112/// and listen in on messages.
2113///
2114/// Example:
2115///
2116/// ```rust,no_run
2117/// # fn do_something() -> redis::RedisResult<()> {
2118/// let client = redis::Client::open("redis://127.0.0.1/")?;
2119/// let mut con = client.get_connection()?;
2120/// let mut pubsub = con.as_pubsub();
2121/// pubsub.subscribe("channel_1")?;
2122/// pubsub.subscribe("channel_2")?;
2123///
2124/// loop {
2125///     let msg = pubsub.get_message()?;
2126///     let payload : String = msg.get_payload()?;
2127///     println!("channel '{}': {}", msg.get_channel_name(), payload);
2128/// }
2129/// # }
2130/// ```
2131impl<'a> PubSub<'a> {
2132    fn new(con: &'a mut Connection) -> Self {
2133        Self {
2134            con,
2135            waiting_messages: VecDeque::new(),
2136        }
2137    }
2138
2139    fn cache_messages_until_received_response(
2140        &mut self,
2141        cmd: &mut Cmd,
2142        is_sub_unsub: bool,
2143    ) -> RedisResult<Value> {
2144        let ignore_response = self.con.protocol.supports_resp3() && is_sub_unsub;
2145        cmd.set_no_response(ignore_response);
2146
2147        self.con.send_packed_command(&cmd.get_packed_command())?;
2148
2149        loop {
2150            let response = self.con.recv_response()?;
2151            if let Some(msg) = Msg::from_value(&response) {
2152                self.waiting_messages.push_back(msg);
2153            } else {
2154                return Ok(response);
2155            }
2156        }
2157    }
2158
2159    /// Subscribes to a new channel(s).
2160    pub fn subscribe<T: ToRedisArgs>(&mut self, channel: T) -> RedisResult<()> {
2161        self.cache_messages_until_received_response(cmd("SUBSCRIBE").arg(channel), true)?;
2162        Ok(())
2163    }
2164
2165    /// Subscribes to new channel(s) with pattern(s).
2166    pub fn psubscribe<T: ToRedisArgs>(&mut self, pchannel: T) -> RedisResult<()> {
2167        self.cache_messages_until_received_response(cmd("PSUBSCRIBE").arg(pchannel), true)?;
2168        Ok(())
2169    }
2170
2171    /// Unsubscribes from a channel(s).
2172    pub fn unsubscribe<T: ToRedisArgs>(&mut self, channel: T) -> RedisResult<()> {
2173        self.cache_messages_until_received_response(cmd("UNSUBSCRIBE").arg(channel), true)?;
2174        Ok(())
2175    }
2176
2177    /// Unsubscribes from channel pattern(s).
2178    pub fn punsubscribe<T: ToRedisArgs>(&mut self, pchannel: T) -> RedisResult<()> {
2179        self.cache_messages_until_received_response(cmd("PUNSUBSCRIBE").arg(pchannel), true)?;
2180        Ok(())
2181    }
2182
2183    /// Sends a ping with a message to the server
2184    pub fn ping_message<T: FromRedisValue>(&mut self, message: impl ToRedisArgs) -> RedisResult<T> {
2185        Ok(from_redis_value(
2186            self.cache_messages_until_received_response(cmd("PING").arg(message), false)?,
2187        )?)
2188    }
2189    /// Sends a ping to the server
2190    pub fn ping<T: FromRedisValue>(&mut self) -> RedisResult<T> {
2191        Ok(from_redis_value(
2192            self.cache_messages_until_received_response(&mut cmd("PING"), false)?,
2193        )?)
2194    }
2195
2196    /// Fetches the next message from the pubsub connection.  Blocks until
2197    /// a message becomes available.  This currently does not provide a
2198    /// wait not to block :(
2199    ///
2200    /// The message itself is still generic and can be converted into an
2201    /// appropriate type through the helper methods on it.
2202    pub fn get_message(&mut self) -> RedisResult<Msg> {
2203        if let Some(msg) = self.waiting_messages.pop_front() {
2204            return Ok(msg);
2205        }
2206        loop {
2207            if let Some(msg) = Msg::from_owned_value(self.con.read(false)?) {
2208                return Ok(msg);
2209            } else {
2210                continue;
2211            }
2212        }
2213    }
2214
2215    /// Sets the read timeout for the connection.
2216    ///
2217    /// If the provided value is `None`, then `get_message` call will
2218    /// block indefinitely. It is an error to pass the zero `Duration` to this
2219    /// method.
2220    pub fn set_read_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
2221        self.con.set_read_timeout(dur)
2222    }
2223}
2224
2225impl Drop for PubSub<'_> {
2226    fn drop(&mut self) {
2227        let _ = self.con.exit_pubsub();
2228    }
2229}
2230
2231/// This holds the data that comes from listening to a pubsub
2232/// connection.  It only contains actual message data.
2233impl Msg {
2234    /// Tries to convert provided [`Value`] into [`Msg`].
2235    pub fn from_value(value: &Value) -> Option<Self> {
2236        Self::from_owned_value(value.clone())
2237    }
2238
2239    /// Tries to convert provided [`Value`] into [`Msg`].
2240    pub fn from_owned_value(value: Value) -> Option<Self> {
2241        let mut pattern = None;
2242        let payload;
2243        let channel;
2244
2245        if let Value::Push { kind, data } = value {
2246            return Self::from_push_info(PushInfo { kind, data });
2247        } else {
2248            let raw_msg: Vec<Value> = from_redis_value(value).ok()?;
2249            let mut iter = raw_msg.into_iter();
2250            let msg_type: String = from_redis_value(iter.next()?).ok()?;
2251            if msg_type == "message" {
2252                channel = iter.next()?;
2253                payload = iter.next()?;
2254            } else if msg_type == "pmessage" {
2255                pattern = Some(iter.next()?);
2256                channel = iter.next()?;
2257                payload = iter.next()?;
2258            } else {
2259                return None;
2260            }
2261        };
2262        Some(Msg {
2263            payload,
2264            channel,
2265            pattern,
2266        })
2267    }
2268
2269    /// Tries to convert provided [`PushInfo`] into [`Msg`].
2270    pub fn from_push_info(push_info: PushInfo) -> Option<Self> {
2271        let mut pattern = None;
2272        let payload;
2273        let channel;
2274
2275        let mut iter = push_info.data.into_iter();
2276        if push_info.kind == PushKind::Message || push_info.kind == PushKind::SMessage {
2277            channel = iter.next()?;
2278            payload = iter.next()?;
2279        } else if push_info.kind == PushKind::PMessage {
2280            pattern = Some(iter.next()?);
2281            channel = iter.next()?;
2282            payload = iter.next()?;
2283        } else {
2284            return None;
2285        }
2286
2287        Some(Msg {
2288            payload,
2289            channel,
2290            pattern,
2291        })
2292    }
2293
2294    /// Returns the channel this message came on.
2295    pub fn get_channel<T: FromRedisValue>(&self) -> RedisResult<T> {
2296        Ok(from_redis_value_ref(&self.channel)?)
2297    }
2298
2299    /// Convenience method to get a string version of the channel.  Unless
2300    /// your channel contains non utf-8 bytes you can always use this
2301    /// method.  If the channel is not a valid string (which really should
2302    /// not happen) then the return value is `"?"`.
2303    pub fn get_channel_name(&self) -> &str {
2304        match self.channel {
2305            Value::BulkString(ref bytes) => from_utf8(bytes).unwrap_or("?"),
2306            _ => "?",
2307        }
2308    }
2309
2310    /// Returns the message's payload in a specific format.
2311    pub fn get_payload<T: FromRedisValue>(&self) -> RedisResult<T> {
2312        Ok(from_redis_value_ref(&self.payload)?)
2313    }
2314
2315    /// Returns the bytes that are the message's payload.  This can be used
2316    /// as an alternative to the `get_payload` function if you are interested
2317    /// in the raw bytes in it.
2318    pub fn get_payload_bytes(&self) -> &[u8] {
2319        match self.payload {
2320            Value::BulkString(ref bytes) => bytes,
2321            _ => b"",
2322        }
2323    }
2324
2325    /// Returns true if the message was constructed from a pattern
2326    /// subscription.
2327    #[allow(clippy::wrong_self_convention)]
2328    pub fn from_pattern(&self) -> bool {
2329        self.pattern.is_some()
2330    }
2331
2332    /// If the message was constructed from a message pattern this can be
2333    /// used to find out which one.  It's recommended to match against
2334    /// an `Option<String>` so that you do not need to use `from_pattern`
2335    /// to figure out if a pattern was set.
2336    pub fn get_pattern<T: FromRedisValue>(&self) -> RedisResult<T> {
2337        Ok(match self.pattern {
2338            None => from_redis_value_ref(&Value::Nil),
2339            Some(ref x) => from_redis_value_ref(x),
2340        }?)
2341    }
2342}
2343
2344/// This function simplifies transaction management slightly.  What it
2345/// does is automatically watching keys and then going into a transaction
2346/// loop util it succeeds.  Once it goes through the results are
2347/// returned.
2348///
2349/// To use the transaction two pieces of information are needed: a list
2350/// of all the keys that need to be watched for modifications and a
2351/// closure with the code that should be execute in the context of the
2352/// transaction.  The closure is invoked with a fresh pipeline in atomic
2353/// mode.  To use the transaction the function needs to return the result
2354/// from querying the pipeline with the connection.
2355///
2356/// The end result of the transaction is then available as the return
2357/// value from the function call.
2358///
2359/// Example:
2360///
2361/// ```rust,no_run
2362/// use redis::Commands;
2363/// # fn do_something() -> redis::RedisResult<()> {
2364/// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
2365/// # let mut con = client.get_connection().unwrap();
2366/// let key = "the_key";
2367/// let (new_val,) : (isize,) = redis::transaction(&mut con, &[key], |con, pipe| {
2368///     let old_val : isize = con.get(key)?;
2369///     pipe
2370///         .set(key, old_val + 1).ignore()
2371///         .get(key).query(con)
2372/// })?;
2373/// println!("The incremented number is: {}", new_val);
2374/// # Ok(()) }
2375/// ```
2376pub fn transaction<
2377    C: ConnectionLike,
2378    K: ToRedisArgs,
2379    T,
2380    F: FnMut(&mut C, &mut Pipeline) -> RedisResult<Option<T>>,
2381>(
2382    con: &mut C,
2383    keys: &[K],
2384    func: F,
2385) -> RedisResult<T> {
2386    let mut func = func;
2387    loop {
2388        cmd("WATCH").arg(keys).exec(con)?;
2389        let mut p = pipe();
2390        let response: Option<T> = func(con, p.atomic())?;
2391        match response {
2392            None => {
2393                continue;
2394            }
2395            Some(response) => {
2396                // make sure no watch is left in the connection, even if
2397                // someone forgot to use the pipeline.
2398                cmd("UNWATCH").exec(con)?;
2399                return Ok(response);
2400            }
2401        }
2402    }
2403}
2404//TODO: for both clearing logic support sharded channels.
2405
2406/// Common logic for clearing subscriptions in RESP2 async/sync
2407pub fn resp2_is_pub_sub_state_cleared(
2408    received_unsub: &mut bool,
2409    received_punsub: &mut bool,
2410    kind: &[u8],
2411    num: isize,
2412) -> bool {
2413    match kind.first() {
2414        Some(&b'u') => *received_unsub = true,
2415        Some(&b'p') => *received_punsub = true,
2416        _ => (),
2417    };
2418    *received_unsub && *received_punsub && num == 0
2419}
2420
2421/// Common logic for clearing subscriptions in RESP3 async/sync
2422pub fn resp3_is_pub_sub_state_cleared(
2423    received_unsub: &mut bool,
2424    received_punsub: &mut bool,
2425    kind: &PushKind,
2426    num: isize,
2427) -> bool {
2428    match kind {
2429        PushKind::Unsubscribe => *received_unsub = true,
2430        PushKind::PUnsubscribe => *received_punsub = true,
2431        _ => (),
2432    };
2433    *received_unsub && *received_punsub && num == 0
2434}
2435
2436pub fn no_sub_err_is_pub_sub_state_cleared(
2437    received_unsub: &mut bool,
2438    received_punsub: &mut bool,
2439    err: &ServerError,
2440) -> bool {
2441    let details = err.details();
2442    *received_unsub = *received_unsub
2443        || details
2444            .map(|details| details.starts_with("'unsub"))
2445            .unwrap_or_default();
2446    *received_punsub = *received_punsub
2447        || details
2448            .map(|details| details.starts_with("'punsub"))
2449            .unwrap_or_default();
2450    *received_unsub && *received_punsub
2451}
2452
2453/// Common logic for checking real cause of hello3 command error
2454pub fn get_resp3_hello_command_error(err: RedisError) -> RedisError {
2455    if let Some(detail) = err.detail() {
2456        if detail.starts_with("unknown command `HELLO`") {
2457            return (
2458                ErrorKind::RESP3NotSupported,
2459                "Redis Server doesn't support HELLO command therefore resp3 cannot be used",
2460            )
2461                .into();
2462        }
2463    }
2464    err
2465}
2466
2467#[cfg(test)]
2468mod tests {
2469    mod util {
2470        use crate::connection::connection_setup_pipeline;
2471        use crate::{RedisConnectionInfo, cmd};
2472
2473        /// Assures that the given [`RedisConnectionInfo`] sets the given expected library name and version
2474        pub fn assert_lib_name_in_connection_setup_pipeline(
2475            redis_connection_info: &RedisConnectionInfo,
2476            expected_lib_name: &str,
2477            expected_lib_ver: &str,
2478        ) {
2479            // Build the pipeline
2480            let pipeline = connection_setup_pipeline(
2481                redis_connection_info,
2482                false,
2483                #[cfg(feature = "cache-aio")]
2484                None,
2485            )
2486            .0;
2487
2488            let actual_packed_cmds = pipeline
2489                .commands
2490                .iter()
2491                .map(|c| c.get_packed_command())
2492                .collect::<Vec<_>>();
2493
2494            let expected_lib_name_packed_cmd = cmd("CLIENT")
2495                .arg("SETINFO")
2496                .arg("LIB-NAME")
2497                .arg(expected_lib_name)
2498                .get_packed_command();
2499            assert!(actual_packed_cmds.contains(&expected_lib_name_packed_cmd));
2500
2501            let expected_lib_ver_packed_cmd = cmd("CLIENT")
2502                .arg("SETINFO")
2503                .arg("LIB-VER")
2504                .arg(expected_lib_ver)
2505                .get_packed_command();
2506            assert!(actual_packed_cmds.contains(&expected_lib_ver_packed_cmd));
2507        }
2508    }
2509
2510    use super::*;
2511    use util::assert_lib_name_in_connection_setup_pipeline;
2512
2513    #[test]
2514    fn test_parse_redis_url() {
2515        let cases = vec![
2516            ("redis://127.0.0.1", true),
2517            ("redis://[::1]", true),
2518            ("rediss://127.0.0.1", true),
2519            ("rediss://[::1]", true),
2520            ("valkey://127.0.0.1", true),
2521            ("valkey://[::1]", true),
2522            ("valkeys://127.0.0.1", true),
2523            ("valkeys://[::1]", true),
2524            ("redis+unix:///run/redis.sock", true),
2525            ("valkey+unix:///run/valkey.sock", true),
2526            ("unix:///run/redis.sock", true),
2527            ("http://127.0.0.1", false),
2528            ("tcp://127.0.0.1", false),
2529        ];
2530        for (url, expected) in cases.into_iter() {
2531            let res = parse_redis_url(url);
2532            assert_eq!(
2533                res.is_some(),
2534                expected,
2535                "Parsed result of `{url}` is not expected",
2536            );
2537        }
2538    }
2539
2540    #[test]
2541    fn test_url_to_tcp_connection_info() {
2542        let cases = vec![
2543            (
2544                url::Url::parse("redis://127.0.0.1").unwrap(),
2545                ConnectionInfo {
2546                    addr: ConnectionAddr::Tcp("127.0.0.1".to_string(), 6379),
2547                    redis: Default::default(),
2548                    tcp_settings: TcpSettings::default(),
2549                },
2550            ),
2551            (
2552                url::Url::parse("redis://[::1]").unwrap(),
2553                ConnectionInfo {
2554                    addr: ConnectionAddr::Tcp("::1".to_string(), 6379),
2555                    redis: Default::default(),
2556                    tcp_settings: TcpSettings::default(),
2557                },
2558            ),
2559            (
2560                url::Url::parse("redis://%25johndoe%25:%23%40%3C%3E%24@example.com/2").unwrap(),
2561                ConnectionInfo {
2562                    addr: ConnectionAddr::Tcp("example.com".to_string(), 6379),
2563                    redis: RedisConnectionInfo {
2564                        db: 2,
2565                        username: Some("%johndoe%".into()),
2566                        password: Some("#@<>$".into()),
2567                        protocol: ProtocolVersion::RESP2,
2568                        skip_set_lib_name: false,
2569                        lib_name: None,
2570                        lib_ver: None,
2571                    },
2572                    tcp_settings: TcpSettings::default(),
2573                },
2574            ),
2575            (
2576                url::Url::parse("redis://127.0.0.1/?protocol=2").unwrap(),
2577                ConnectionInfo {
2578                    addr: ConnectionAddr::Tcp("127.0.0.1".to_string(), 6379),
2579                    redis: Default::default(),
2580                    tcp_settings: TcpSettings::default(),
2581                },
2582            ),
2583            (
2584                url::Url::parse("redis://127.0.0.1/?protocol=resp3").unwrap(),
2585                ConnectionInfo {
2586                    addr: ConnectionAddr::Tcp("127.0.0.1".to_string(), 6379),
2587                    redis: RedisConnectionInfo {
2588                        db: 0,
2589                        username: None,
2590                        password: None,
2591                        protocol: ProtocolVersion::RESP3,
2592                        skip_set_lib_name: false,
2593                        lib_name: None,
2594                        lib_ver: None,
2595                    },
2596                    tcp_settings: TcpSettings::default(),
2597                },
2598            ),
2599        ];
2600        for (url, expected) in cases.into_iter() {
2601            let res = url_to_tcp_connection_info(url.clone()).unwrap();
2602            assert_eq!(res.addr, expected.addr, "addr of {url} is not expected");
2603            assert_eq!(
2604                res.redis.db, expected.redis.db,
2605                "db of {url} is not expected",
2606            );
2607            assert_eq!(
2608                res.redis.username, expected.redis.username,
2609                "username of {url} is not expected",
2610            );
2611            assert_eq!(
2612                res.redis.password, expected.redis.password,
2613                "password of {url} is not expected",
2614            );
2615        }
2616    }
2617
2618    #[test]
2619    fn test_url_to_tcp_connection_info_failed() {
2620        let cases = vec![
2621            (
2622                url::Url::parse("redis://").unwrap(),
2623                "Missing hostname",
2624                None,
2625            ),
2626            (
2627                url::Url::parse("redis://127.0.0.1/db").unwrap(),
2628                "Invalid database number",
2629                None,
2630            ),
2631            (
2632                url::Url::parse("redis://C3%B0@127.0.0.1").unwrap(),
2633                "Username is not valid UTF-8 string",
2634                None,
2635            ),
2636            (
2637                url::Url::parse("redis://:C3%B0@127.0.0.1").unwrap(),
2638                "Password is not valid UTF-8 string",
2639                None,
2640            ),
2641            (
2642                url::Url::parse("redis://127.0.0.1/?protocol=4").unwrap(),
2643                "Invalid protocol version",
2644                Some("4"),
2645            ),
2646        ];
2647        for (url, expected, detail) in cases.into_iter() {
2648            let res = url_to_tcp_connection_info(url).unwrap_err();
2649            assert_eq!(res.kind(), crate::ErrorKind::InvalidClientConfig,);
2650            let desc = res.to_string();
2651            assert!(desc.contains(expected), "{desc}");
2652            assert_eq!(res.detail(), detail);
2653        }
2654    }
2655
2656    #[test]
2657    #[cfg(unix)]
2658    fn test_url_to_unix_connection_info() {
2659        let cases = vec![
2660            (
2661                url::Url::parse("unix:///var/run/redis.sock").unwrap(),
2662                ConnectionInfo {
2663                    addr: ConnectionAddr::Unix("/var/run/redis.sock".into()),
2664                    redis: RedisConnectionInfo {
2665                        db: 0,
2666                        username: None,
2667                        password: None,
2668                        protocol: ProtocolVersion::RESP2,
2669                        skip_set_lib_name: false,
2670                        lib_name: None,
2671                        lib_ver: None,
2672                    },
2673                    tcp_settings: Default::default(),
2674                },
2675            ),
2676            (
2677                url::Url::parse("redis+unix:///var/run/redis.sock?db=1").unwrap(),
2678                ConnectionInfo {
2679                    addr: ConnectionAddr::Unix("/var/run/redis.sock".into()),
2680                    redis: RedisConnectionInfo {
2681                        db: 1,
2682                        username: None,
2683                        password: None,
2684                        protocol: ProtocolVersion::RESP2,
2685                        skip_set_lib_name: false,
2686                        lib_name: None,
2687                        lib_ver: None,
2688                    },
2689                    tcp_settings: TcpSettings::default(),
2690                },
2691            ),
2692            (
2693                url::Url::parse(
2694                    "unix:///example.sock?user=%25johndoe%25&pass=%23%40%3C%3E%24&db=2",
2695                )
2696                .unwrap(),
2697                ConnectionInfo {
2698                    addr: ConnectionAddr::Unix("/example.sock".into()),
2699                    redis: RedisConnectionInfo {
2700                        db: 2,
2701                        username: Some("%johndoe%".into()),
2702                        password: Some("#@<>$".into()),
2703                        protocol: ProtocolVersion::RESP2,
2704                        skip_set_lib_name: false,
2705                        lib_name: None,
2706                        lib_ver: None,
2707                    },
2708                    tcp_settings: TcpSettings::default(),
2709                },
2710            ),
2711            (
2712                url::Url::parse(
2713                    "redis+unix:///example.sock?pass=%26%3F%3D+%2A%2B&db=2&user=%25johndoe%25",
2714                )
2715                .unwrap(),
2716                ConnectionInfo {
2717                    addr: ConnectionAddr::Unix("/example.sock".into()),
2718                    redis: RedisConnectionInfo {
2719                        db: 2,
2720                        username: Some("%johndoe%".into()),
2721                        password: Some("&?= *+".into()),
2722                        protocol: ProtocolVersion::RESP2,
2723                        skip_set_lib_name: false,
2724                        lib_name: None,
2725                        lib_ver: None,
2726                    },
2727                    tcp_settings: TcpSettings::default(),
2728                },
2729            ),
2730            (
2731                url::Url::parse("redis+unix:///var/run/redis.sock?protocol=3").unwrap(),
2732                ConnectionInfo {
2733                    addr: ConnectionAddr::Unix("/var/run/redis.sock".into()),
2734                    redis: RedisConnectionInfo {
2735                        db: 0,
2736                        username: None,
2737                        password: None,
2738                        protocol: ProtocolVersion::RESP3,
2739                        skip_set_lib_name: false,
2740                        lib_name: None,
2741                        lib_ver: None,
2742                    },
2743                    tcp_settings: TcpSettings::default(),
2744                },
2745            ),
2746        ];
2747        for (url, expected) in cases.into_iter() {
2748            assert_eq!(
2749                ConnectionAddr::Unix(url.to_file_path().unwrap()),
2750                expected.addr,
2751                "addr of {url} is not expected",
2752            );
2753            let res = url_to_unix_connection_info(url.clone()).unwrap();
2754            assert_eq!(res.addr, expected.addr, "addr of {url} is not expected");
2755            assert_eq!(
2756                res.redis.db, expected.redis.db,
2757                "db of {url} is not expected",
2758            );
2759            assert_eq!(
2760                res.redis.username, expected.redis.username,
2761                "username of {url} is not expected",
2762            );
2763            assert_eq!(
2764                res.redis.password, expected.redis.password,
2765                "password of {url} is not expected",
2766            );
2767        }
2768    }
2769
2770    #[test]
2771    fn redis_connection_info_lib_name_default() {
2772        let redis_connection_info = RedisConnectionInfo::default();
2773
2774        // Check the accessors
2775        assert_eq!(redis_connection_info.lib_name(), None);
2776        assert_eq!(redis_connection_info.lib_ver(), None);
2777
2778        // Check the connection setup pipeline
2779        assert_lib_name_in_connection_setup_pipeline(
2780            &redis_connection_info,
2781            DEFAULT_CLIENT_SETINFO_LIB_NAME,
2782            DEFAULT_CLIENT_SETINFO_LIB_VER,
2783        );
2784    }
2785
2786    #[test]
2787    fn redis_connection_info_lib_name_custom() {
2788        let mut redis_connection_info = RedisConnectionInfo::default();
2789
2790        // Mark to skip setting the lib_name. This allows to test that `set_lib_name` clears it again;
2791        redis_connection_info = redis_connection_info.set_skip_set_lib_name();
2792        assert!(redis_connection_info.skip_set_lib_name());
2793
2794        // Set the lib_name
2795        redis_connection_info = redis_connection_info.set_lib_name("foo", "42.4711");
2796
2797        // Check the accessors
2798        assert!(!redis_connection_info.skip_set_lib_name());
2799        assert_eq!(redis_connection_info.lib_name(), Some("foo"));
2800        assert_eq!(redis_connection_info.lib_ver(), Some("42.4711"));
2801
2802        // Check the connection setup pipeline
2803        assert_lib_name_in_connection_setup_pipeline(&redis_connection_info, "foo", "42.4711");
2804    }
2805}