1use std::borrow::Cow;
2use std::collections::VecDeque;
3use std::fmt;
4use std::io::{self, Write};
5use std::net::{self, SocketAddr, TcpStream, ToSocketAddrs};
6use std::ops::DerefMut;
7use std::path::PathBuf;
8use std::str::{FromStr, from_utf8};
9use std::time::{Duration, Instant};
10
11use crate::cmd::{Cmd, cmd, pipe};
12use crate::errors::{ErrorKind, RedisError, ServerError, ServerErrorKind};
13use crate::io::tcp::{TcpSettings, stream_with_settings};
14use crate::parser::Parser;
15use crate::pipeline::Pipeline;
16use crate::types::{
17 FromRedisValue, HashMap, PushKind, RedisResult, SyncPushSender, ToRedisArgs, Value,
18 from_redis_value_ref,
19};
20use crate::{ProtocolVersion, check_resp3, from_redis_value};
21
22#[cfg(unix)]
23use std::os::unix::net::UnixStream;
24
25use crate::commands::resp3_hello;
26use arcstr::ArcStr;
27#[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
28use native_tls::{TlsConnector, TlsStream};
29
30#[cfg(feature = "tls-rustls")]
31use rustls::{RootCertStore, StreamOwned};
32#[cfg(feature = "tls-rustls")]
33use std::sync::Arc;
34
35use crate::PushInfo;
36
37#[cfg(all(
38 feature = "tls-rustls",
39 not(feature = "tls-native-tls"),
40 not(feature = "tls-rustls-webpki-roots")
41))]
42use rustls_native_certs::load_native_certs;
43
44#[cfg(feature = "tls-rustls")]
45use crate::tls::ClientTlsParams;
46
47#[derive(Clone, Debug)]
49pub struct TlsConnParams {
50 #[cfg(feature = "tls-rustls")]
51 pub(crate) client_tls_params: Option<ClientTlsParams>,
52 #[cfg(feature = "tls-rustls")]
53 pub(crate) root_cert_store: Option<RootCertStore>,
54 #[cfg(any(feature = "tls-rustls-insecure", feature = "tls-native-tls"))]
55 pub(crate) danger_accept_invalid_hostnames: bool,
56}
57
58static DEFAULT_PORT: u16 = 6379;
59
60const DEFAULT_CLIENT_SETINFO_LIB_NAME: &str = "redis-rs";
62const DEFAULT_CLIENT_SETINFO_LIB_VER: &str = env!("CARGO_PKG_VERSION");
64
65#[inline(always)]
66fn connect_tcp(addr: (&str, u16), tcp_settings: &TcpSettings) -> io::Result<TcpStream> {
67 let socket = TcpStream::connect(addr)?;
68 stream_with_settings(socket, tcp_settings)
69}
70
71#[inline(always)]
72fn connect_tcp_timeout(
73 addr: &SocketAddr,
74 timeout: Duration,
75 tcp_settings: &TcpSettings,
76) -> io::Result<TcpStream> {
77 let socket = TcpStream::connect_timeout(addr, timeout)?;
78 stream_with_settings(socket, tcp_settings)
79}
80
81pub fn parse_redis_url(input: &str) -> Option<url::Url> {
86 match url::Url::parse(input) {
87 Ok(result) => match result.scheme() {
88 "redis" | "rediss" | "valkey" | "valkeys" | "redis+unix" | "valkey+unix" | "unix" => {
89 Some(result)
90 }
91 _ => None,
92 },
93 Err(_) => None,
94 }
95}
96
97#[derive(Clone, Copy, PartialEq)]
101#[non_exhaustive]
102pub enum TlsMode {
103 Secure,
105 Insecure,
107}
108
109#[derive(Clone, Debug)]
115#[non_exhaustive]
116pub enum ConnectionAddr {
117 Tcp(String, u16),
119 TcpTls {
121 host: String,
123 port: u16,
125 insecure: bool,
134
135 tls_params: Option<TlsConnParams>,
137 },
138 Unix(PathBuf),
140}
141
142impl PartialEq for ConnectionAddr {
143 fn eq(&self, other: &Self) -> bool {
144 match (self, other) {
145 (ConnectionAddr::Tcp(host1, port1), ConnectionAddr::Tcp(host2, port2)) => {
146 host1 == host2 && port1 == port2
147 }
148 (
149 ConnectionAddr::TcpTls {
150 host: host1,
151 port: port1,
152 insecure: insecure1,
153 tls_params: _,
154 },
155 ConnectionAddr::TcpTls {
156 host: host2,
157 port: port2,
158 insecure: insecure2,
159 tls_params: _,
160 },
161 ) => port1 == port2 && host1 == host2 && insecure1 == insecure2,
162 (ConnectionAddr::Unix(path1), ConnectionAddr::Unix(path2)) => path1 == path2,
163 _ => false,
164 }
165 }
166}
167
168impl Eq for ConnectionAddr {}
169
170impl ConnectionAddr {
171 pub fn is_supported(&self) -> bool {
182 match *self {
183 ConnectionAddr::Tcp(_, _) => true,
184 ConnectionAddr::TcpTls { .. } => {
185 cfg!(any(feature = "tls-native-tls", feature = "tls-rustls"))
186 }
187 ConnectionAddr::Unix(_) => cfg!(unix),
188 }
189 }
190
191 #[cfg(any(feature = "tls-rustls-insecure", feature = "tls-native-tls"))]
200 pub fn set_danger_accept_invalid_hostnames(&mut self, insecure: bool) {
201 if let ConnectionAddr::TcpTls { tls_params, .. } = self {
202 if let Some(params) = tls_params {
203 params.danger_accept_invalid_hostnames = insecure;
204 } else if insecure {
205 *tls_params = Some(TlsConnParams {
206 #[cfg(feature = "tls-rustls")]
207 client_tls_params: None,
208 #[cfg(feature = "tls-rustls")]
209 root_cert_store: None,
210 danger_accept_invalid_hostnames: insecure,
211 });
212 }
213 }
214 }
215
216 #[cfg(feature = "cluster")]
217 pub(crate) fn tls_mode(&self) -> Option<TlsMode> {
218 match self {
219 ConnectionAddr::TcpTls { insecure, .. } => {
220 if *insecure {
221 Some(TlsMode::Insecure)
222 } else {
223 Some(TlsMode::Secure)
224 }
225 }
226 _ => None,
227 }
228 }
229}
230
231impl fmt::Display for ConnectionAddr {
232 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
233 match *self {
235 ConnectionAddr::Tcp(ref host, port) => write!(f, "{host}:{port}"),
236 ConnectionAddr::TcpTls { ref host, port, .. } => write!(f, "{host}:{port}"),
237 ConnectionAddr::Unix(ref path) => write!(f, "{}", path.display()),
238 }
239 }
240}
241
242#[derive(Clone, Debug)]
244pub struct ConnectionInfo {
245 pub(crate) addr: ConnectionAddr,
247
248 pub(crate) tcp_settings: TcpSettings,
250 pub(crate) redis: RedisConnectionInfo,
252}
253
254impl ConnectionInfo {
255 pub fn addr(&self) -> &ConnectionAddr {
257 &self.addr
258 }
259
260 pub fn tcp_settings(&self) -> &TcpSettings {
262 &self.tcp_settings
263 }
264
265 pub fn redis_settings(&self) -> &RedisConnectionInfo {
267 &self.redis
268 }
269
270 pub fn set_addr(mut self, addr: ConnectionAddr) -> Self {
272 self.addr = addr;
273 self
274 }
275
276 pub fn set_tcp_settings(mut self, tcp_settings: TcpSettings) -> Self {
278 self.tcp_settings = tcp_settings;
279 self
280 }
281
282 pub fn set_redis_settings(mut self, redis: RedisConnectionInfo) -> Self {
284 self.redis = redis;
285 self
286 }
287}
288
289#[derive(Clone, Default)]
291pub struct RedisConnectionInfo {
292 pub(crate) db: i64,
294 pub(crate) username: Option<ArcStr>,
296 pub(crate) password: Option<ArcStr>,
298 pub(crate) protocol: ProtocolVersion,
300 pub(crate) skip_set_lib_name: bool,
302 pub(crate) lib_name: Option<ArcStr>,
304 pub(crate) lib_ver: Option<ArcStr>,
306}
307
308impl RedisConnectionInfo {
309 pub fn username(&self) -> Option<&str> {
311 self.username.as_deref()
312 }
313
314 pub fn password(&self) -> Option<&str> {
316 self.password.as_deref()
317 }
318
319 pub fn protocol(&self) -> ProtocolVersion {
321 self.protocol
322 }
323
324 pub fn skip_set_lib_name(&self) -> bool {
326 self.skip_set_lib_name
327 }
328
329 pub fn lib_name(&self) -> Option<&str> {
331 self.lib_name.as_deref()
332 }
333
334 pub fn lib_ver(&self) -> Option<&str> {
336 self.lib_ver.as_deref()
337 }
338
339 pub fn db(&self) -> i64 {
341 self.db
342 }
343
344 pub fn set_username(mut self, username: impl AsRef<str>) -> Self {
346 self.username = Some(username.as_ref().into());
347 self
348 }
349
350 pub fn set_password(mut self, password: impl AsRef<str>) -> Self {
352 self.password = Some(password.as_ref().into());
353 self
354 }
355
356 pub fn set_protocol(mut self, protocol: ProtocolVersion) -> Self {
358 self.protocol = protocol;
359 self
360 }
361
362 pub fn set_skip_set_lib_name(mut self) -> Self {
367 self.skip_set_lib_name = true;
368 self
369 }
370
371 pub fn set_lib_name(mut self, lib_name: impl AsRef<str>, lib_ver: impl AsRef<str>) -> Self {
375 self.lib_name = Some(lib_name.as_ref().into());
376 self.lib_ver = Some(lib_ver.as_ref().into());
377 self.skip_set_lib_name = false;
378 self
379 }
380
381 pub fn set_db(mut self, db: i64) -> Self {
383 self.db = db;
384 self
385 }
386}
387
388impl std::fmt::Debug for RedisConnectionInfo {
389 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
390 let RedisConnectionInfo {
391 db,
392 username,
393 password,
394 protocol,
395 skip_set_lib_name,
396 lib_name,
397 lib_ver,
398 } = self;
399 let mut debug_info = f.debug_struct("RedisConnectionInfo");
400
401 debug_info.field("db", &db);
402 debug_info.field("username", &username);
403 debug_info.field("password", &password.as_ref().map(|_| "<redacted>"));
404 debug_info.field("protocol", &protocol);
405 debug_info.field("skip_set_lib_name", &skip_set_lib_name);
406 debug_info.field("lib_name", &lib_name);
407 debug_info.field("lib_ver", &lib_ver);
408
409 debug_info.finish()
410 }
411}
412
413impl FromStr for ConnectionInfo {
414 type Err = RedisError;
415
416 fn from_str(s: &str) -> Result<Self, Self::Err> {
417 s.into_connection_info()
418 }
419}
420
421pub trait IntoConnectionInfo {
425 fn into_connection_info(self) -> RedisResult<ConnectionInfo>;
427}
428
429impl IntoConnectionInfo for ConnectionInfo {
430 fn into_connection_info(self) -> RedisResult<ConnectionInfo> {
431 Ok(self)
432 }
433}
434
435impl IntoConnectionInfo for ConnectionAddr {
436 fn into_connection_info(self) -> RedisResult<ConnectionInfo> {
437 Ok(ConnectionInfo {
438 addr: self,
439 redis: Default::default(),
440 tcp_settings: Default::default(),
441 })
442 }
443}
444
445impl IntoConnectionInfo for &str {
455 fn into_connection_info(self) -> RedisResult<ConnectionInfo> {
456 match parse_redis_url(self) {
457 Some(u) => u.into_connection_info(),
458 None => fail!((ErrorKind::InvalidClientConfig, "Redis URL did not parse")),
459 }
460 }
461}
462
463impl<T> IntoConnectionInfo for (T, u16)
464where
465 T: Into<String>,
466{
467 fn into_connection_info(self) -> RedisResult<ConnectionInfo> {
468 Ok(ConnectionInfo {
469 addr: ConnectionAddr::Tcp(self.0.into(), self.1),
470 redis: RedisConnectionInfo::default(),
471 tcp_settings: TcpSettings::default(),
472 })
473 }
474}
475
476impl IntoConnectionInfo for String {
486 fn into_connection_info(self) -> RedisResult<ConnectionInfo> {
487 match parse_redis_url(&self) {
488 Some(u) => u.into_connection_info(),
489 None => fail!((ErrorKind::InvalidClientConfig, "Redis URL did not parse")),
490 }
491 }
492}
493
494fn parse_protocol(query: &HashMap<Cow<str>, Cow<str>>) -> RedisResult<ProtocolVersion> {
495 Ok(match query.get("protocol") {
496 Some(protocol) => {
497 if protocol == "2" || protocol == "resp2" {
498 ProtocolVersion::RESP2
499 } else if protocol == "3" || protocol == "resp3" {
500 ProtocolVersion::RESP3
501 } else {
502 fail!((
503 ErrorKind::InvalidClientConfig,
504 "Invalid protocol version",
505 protocol.to_string()
506 ))
507 }
508 }
509 None => ProtocolVersion::RESP2,
510 })
511}
512
513#[inline]
514pub(crate) fn is_wildcard_address(address: &str) -> bool {
515 address == "0.0.0.0" || address == "::"
516}
517
518fn url_to_tcp_connection_info(url: url::Url) -> RedisResult<ConnectionInfo> {
519 let host = match url.host() {
520 Some(host) => {
521 let host_str = match host {
533 url::Host::Domain(path) => path.to_string(),
534 url::Host::Ipv4(v4) => v4.to_string(),
535 url::Host::Ipv6(v6) => v6.to_string(),
536 };
537
538 if is_wildcard_address(&host_str) {
539 return Err(RedisError::from((
540 ErrorKind::InvalidClientConfig,
541 "Cannot connect to a wildcard address (0.0.0.0 or ::)",
542 )));
543 }
544 host_str
545 }
546 None => fail!((ErrorKind::InvalidClientConfig, "Missing hostname")),
547 };
548 let port = url.port().unwrap_or(DEFAULT_PORT);
549 let addr = if url.scheme() == "rediss" || url.scheme() == "valkeys" {
550 #[cfg(any(feature = "tls-native-tls", feature = "tls-rustls"))]
551 {
552 match url.fragment() {
553 Some("insecure") => ConnectionAddr::TcpTls {
554 host,
555 port,
556 insecure: true,
557 tls_params: None,
558 },
559 Some(_) => fail!((
560 ErrorKind::InvalidClientConfig,
561 "only #insecure is supported as URL fragment"
562 )),
563 _ => ConnectionAddr::TcpTls {
564 host,
565 port,
566 insecure: false,
567 tls_params: None,
568 },
569 }
570 }
571
572 #[cfg(not(any(feature = "tls-native-tls", feature = "tls-rustls")))]
573 fail!((
574 ErrorKind::InvalidClientConfig,
575 "can't connect with TLS, the feature is not enabled"
576 ));
577 } else {
578 ConnectionAddr::Tcp(host, port)
579 };
580 let query: HashMap<_, _> = url.query_pairs().collect();
581 Ok(ConnectionInfo {
582 addr,
583 redis: RedisConnectionInfo {
584 db: match url.path().trim_matches('/') {
585 "" => 0,
586 path => path.parse::<i64>().map_err(|_| -> RedisError {
587 (ErrorKind::InvalidClientConfig, "Invalid database number").into()
588 })?,
589 },
590 username: if url.username().is_empty() {
591 None
592 } else {
593 match percent_encoding::percent_decode(url.username().as_bytes()).decode_utf8() {
594 Ok(decoded) => Some(decoded.into()),
595 Err(_) => fail!((
596 ErrorKind::InvalidClientConfig,
597 "Username is not valid UTF-8 string"
598 )),
599 }
600 },
601 password: match url.password() {
602 Some(pw) => match percent_encoding::percent_decode(pw.as_bytes()).decode_utf8() {
603 Ok(decoded) => Some(decoded.into()),
604 Err(_) => fail!((
605 ErrorKind::InvalidClientConfig,
606 "Password is not valid UTF-8 string"
607 )),
608 },
609 None => None,
610 },
611 protocol: parse_protocol(&query)?,
612 skip_set_lib_name: false,
613 lib_name: None,
614 lib_ver: None,
615 },
616 tcp_settings: TcpSettings::default(),
617 })
618}
619
620#[cfg(unix)]
621fn url_to_unix_connection_info(url: url::Url) -> RedisResult<ConnectionInfo> {
622 let query: HashMap<_, _> = url.query_pairs().collect();
623 Ok(ConnectionInfo {
624 addr: ConnectionAddr::Unix(url.to_file_path().map_err(|_| -> RedisError {
625 (ErrorKind::InvalidClientConfig, "Missing path").into()
626 })?),
627 redis: RedisConnectionInfo {
628 db: match query.get("db") {
629 Some(db) => db.parse::<i64>().map_err(|_| -> RedisError {
630 (ErrorKind::InvalidClientConfig, "Invalid database number").into()
631 })?,
632
633 None => 0,
634 },
635 username: query.get("user").map(|username| username.as_ref().into()),
636 password: query.get("pass").map(|password| password.as_ref().into()),
637 protocol: parse_protocol(&query)?,
638 skip_set_lib_name: false,
639 lib_name: None,
640 lib_ver: None,
641 },
642 tcp_settings: TcpSettings::default(),
643 })
644}
645
646#[cfg(not(unix))]
647fn url_to_unix_connection_info(_: url::Url) -> RedisResult<ConnectionInfo> {
648 fail!((
649 ErrorKind::InvalidClientConfig,
650 "Unix sockets are not available on this platform."
651 ));
652}
653
654impl IntoConnectionInfo for url::Url {
655 fn into_connection_info(self) -> RedisResult<ConnectionInfo> {
656 match self.scheme() {
657 "redis" | "rediss" | "valkey" | "valkeys" => url_to_tcp_connection_info(self),
658 "unix" | "redis+unix" | "valkey+unix" => url_to_unix_connection_info(self),
659 _ => fail!((
660 ErrorKind::InvalidClientConfig,
661 "URL provided is not a redis URL"
662 )),
663 }
664 }
665}
666
667struct TcpConnection {
668 reader: TcpStream,
669 open: bool,
670}
671
672#[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
673struct TcpNativeTlsConnection {
674 reader: TlsStream<TcpStream>,
675 open: bool,
676}
677
678#[cfg(feature = "tls-rustls")]
679struct TcpRustlsConnection {
680 reader: StreamOwned<rustls::ClientConnection, TcpStream>,
681 open: bool,
682}
683
684#[cfg(unix)]
685struct UnixConnection {
686 sock: UnixStream,
687 open: bool,
688}
689
690enum ActualConnection {
691 Tcp(TcpConnection),
692 #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
693 TcpNativeTls(Box<TcpNativeTlsConnection>),
694 #[cfg(feature = "tls-rustls")]
695 TcpRustls(Box<TcpRustlsConnection>),
696 #[cfg(unix)]
697 Unix(UnixConnection),
698}
699
700#[cfg(feature = "tls-rustls-insecure")]
701struct NoCertificateVerification {
702 supported: rustls::crypto::WebPkiSupportedAlgorithms,
703}
704
705#[cfg(feature = "tls-rustls-insecure")]
706impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification {
707 fn verify_server_cert(
708 &self,
709 _end_entity: &rustls::pki_types::CertificateDer<'_>,
710 _intermediates: &[rustls::pki_types::CertificateDer<'_>],
711 _server_name: &rustls::pki_types::ServerName<'_>,
712 _ocsp_response: &[u8],
713 _now: rustls::pki_types::UnixTime,
714 ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
715 Ok(rustls::client::danger::ServerCertVerified::assertion())
716 }
717
718 fn verify_tls12_signature(
719 &self,
720 _message: &[u8],
721 _cert: &rustls::pki_types::CertificateDer<'_>,
722 _dss: &rustls::DigitallySignedStruct,
723 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
724 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
725 }
726
727 fn verify_tls13_signature(
728 &self,
729 _message: &[u8],
730 _cert: &rustls::pki_types::CertificateDer<'_>,
731 _dss: &rustls::DigitallySignedStruct,
732 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
733 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
734 }
735
736 fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
737 self.supported.supported_schemes()
738 }
739}
740
741#[cfg(feature = "tls-rustls-insecure")]
742impl fmt::Debug for NoCertificateVerification {
743 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
744 f.debug_struct("NoCertificateVerification").finish()
745 }
746}
747
748#[cfg(feature = "tls-rustls-insecure")]
750#[derive(Debug)]
751struct AcceptInvalidHostnamesCertVerifier {
752 inner: Arc<rustls::client::WebPkiServerVerifier>,
753}
754
755#[cfg(feature = "tls-rustls-insecure")]
756fn is_hostname_error(err: &rustls::Error) -> bool {
757 matches!(
758 err,
759 rustls::Error::InvalidCertificate(
760 rustls::CertificateError::NotValidForName
761 | rustls::CertificateError::NotValidForNameContext { .. }
762 )
763 )
764}
765
766#[cfg(feature = "tls-rustls-insecure")]
767impl rustls::client::danger::ServerCertVerifier for AcceptInvalidHostnamesCertVerifier {
768 fn verify_server_cert(
769 &self,
770 end_entity: &rustls::pki_types::CertificateDer<'_>,
771 intermediates: &[rustls::pki_types::CertificateDer<'_>],
772 server_name: &rustls::pki_types::ServerName<'_>,
773 ocsp_response: &[u8],
774 now: rustls::pki_types::UnixTime,
775 ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
776 self.inner
777 .verify_server_cert(end_entity, intermediates, server_name, ocsp_response, now)
778 .or_else(|err| {
779 if is_hostname_error(&err) {
780 Ok(rustls::client::danger::ServerCertVerified::assertion())
781 } else {
782 Err(err)
783 }
784 })
785 }
786
787 fn verify_tls12_signature(
788 &self,
789 message: &[u8],
790 cert: &rustls::pki_types::CertificateDer<'_>,
791 dss: &rustls::DigitallySignedStruct,
792 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
793 self.inner
794 .verify_tls12_signature(message, cert, dss)
795 .or_else(|err| {
796 if is_hostname_error(&err) {
797 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
798 } else {
799 Err(err)
800 }
801 })
802 }
803
804 fn verify_tls13_signature(
805 &self,
806 message: &[u8],
807 cert: &rustls::pki_types::CertificateDer<'_>,
808 dss: &rustls::DigitallySignedStruct,
809 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
810 self.inner
811 .verify_tls13_signature(message, cert, dss)
812 .or_else(|err| {
813 if is_hostname_error(&err) {
814 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
815 } else {
816 Err(err)
817 }
818 })
819 }
820
821 fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
822 self.inner.supported_verify_schemes()
823 }
824}
825
826pub struct Connection {
828 con: ActualConnection,
829 parser: Parser,
830 db: i64,
831
832 pubsub: bool,
837
838 protocol: ProtocolVersion,
840
841 push_sender: Option<SyncPushSender>,
843
844 messages_to_skip: usize,
847}
848
849pub struct PubSub<'a> {
853 con: &'a mut Connection,
854 waiting_messages: VecDeque<Msg>,
855}
856
857#[derive(Debug, Clone)]
859pub struct Msg {
860 payload: Value,
861 channel: Value,
862 pattern: Option<Value>,
863}
864
865impl ActualConnection {
866 pub fn new(
867 addr: &ConnectionAddr,
868 timeout: Option<Duration>,
869 tcp_settings: &TcpSettings,
870 ) -> RedisResult<ActualConnection> {
871 Ok(match *addr {
872 ConnectionAddr::Tcp(ref host, ref port) => {
873 if is_wildcard_address(host) {
874 fail!((
875 ErrorKind::InvalidClientConfig,
876 "Cannot connect to a wildcard address (0.0.0.0 or ::)"
877 ));
878 }
879 let addr = (host.as_str(), *port);
880 let tcp = match timeout {
881 None => connect_tcp(addr, tcp_settings)?,
882 Some(timeout) => {
883 let mut tcp = None;
884 let mut last_error = None;
885 for addr in addr.to_socket_addrs()? {
886 match connect_tcp_timeout(&addr, timeout, tcp_settings) {
887 Ok(l) => {
888 tcp = Some(l);
889 break;
890 }
891 Err(e) => {
892 last_error = Some(e);
893 }
894 };
895 }
896 match (tcp, last_error) {
897 (Some(tcp), _) => tcp,
898 (None, Some(e)) => {
899 fail!(e);
900 }
901 (None, None) => {
902 fail!((
903 ErrorKind::InvalidClientConfig,
904 "could not resolve to any addresses"
905 ));
906 }
907 }
908 }
909 };
910 ActualConnection::Tcp(TcpConnection {
911 reader: tcp,
912 open: true,
913 })
914 }
915 #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
916 ConnectionAddr::TcpTls {
917 ref host,
918 port,
919 insecure,
920 ref tls_params,
921 } => {
922 let tls_connector = if insecure {
923 TlsConnector::builder()
924 .danger_accept_invalid_certs(true)
925 .danger_accept_invalid_hostnames(true)
926 .use_sni(false)
927 .build()?
928 } else if let Some(params) = tls_params {
929 TlsConnector::builder()
930 .danger_accept_invalid_hostnames(params.danger_accept_invalid_hostnames)
931 .build()?
932 } else {
933 TlsConnector::new()?
934 };
935 let addr = (host.as_str(), port);
936 let tls = match timeout {
937 None => {
938 let tcp = connect_tcp(addr, tcp_settings)?;
939 match tls_connector.connect(host, tcp) {
940 Ok(res) => res,
941 Err(e) => {
942 fail!((ErrorKind::Io, "SSL Handshake error", e.to_string()));
943 }
944 }
945 }
946 Some(timeout) => {
947 let mut tcp = None;
948 let mut last_error = None;
949 for addr in (host.as_str(), port).to_socket_addrs()? {
950 match connect_tcp_timeout(&addr, timeout, tcp_settings) {
951 Ok(l) => {
952 tcp = Some(l);
953 break;
954 }
955 Err(e) => {
956 last_error = Some(e);
957 }
958 };
959 }
960 match (tcp, last_error) {
961 (Some(tcp), _) => tls_connector.connect(host, tcp).unwrap(),
962 (None, Some(e)) => {
963 fail!(e);
964 }
965 (None, None) => {
966 fail!((
967 ErrorKind::InvalidClientConfig,
968 "could not resolve to any addresses"
969 ));
970 }
971 }
972 }
973 };
974 ActualConnection::TcpNativeTls(Box::new(TcpNativeTlsConnection {
975 reader: tls,
976 open: true,
977 }))
978 }
979 #[cfg(feature = "tls-rustls")]
980 ConnectionAddr::TcpTls {
981 ref host,
982 port,
983 insecure,
984 ref tls_params,
985 } => {
986 let host: &str = host;
987 let config = create_rustls_config(insecure, tls_params.clone())?;
988 let conn = rustls::ClientConnection::new(
989 Arc::new(config),
990 rustls::pki_types::ServerName::try_from(host)?.to_owned(),
991 )?;
992 let reader = match timeout {
993 None => {
994 let tcp = connect_tcp((host, port), tcp_settings)?;
995 StreamOwned::new(conn, tcp)
996 }
997 Some(timeout) => {
998 let mut tcp = None;
999 let mut last_error = None;
1000 for addr in (host, port).to_socket_addrs()? {
1001 match connect_tcp_timeout(&addr, timeout, tcp_settings) {
1002 Ok(l) => {
1003 tcp = Some(l);
1004 break;
1005 }
1006 Err(e) => {
1007 last_error = Some(e);
1008 }
1009 };
1010 }
1011 match (tcp, last_error) {
1012 (Some(tcp), _) => StreamOwned::new(conn, tcp),
1013 (None, Some(e)) => {
1014 fail!(e);
1015 }
1016 (None, None) => {
1017 fail!((
1018 ErrorKind::InvalidClientConfig,
1019 "could not resolve to any addresses"
1020 ));
1021 }
1022 }
1023 }
1024 };
1025
1026 ActualConnection::TcpRustls(Box::new(TcpRustlsConnection { reader, open: true }))
1027 }
1028 #[cfg(not(any(feature = "tls-native-tls", feature = "tls-rustls")))]
1029 ConnectionAddr::TcpTls { .. } => {
1030 fail!((
1031 ErrorKind::InvalidClientConfig,
1032 "Cannot connect to TCP with TLS without the tls feature"
1033 ));
1034 }
1035 #[cfg(unix)]
1036 ConnectionAddr::Unix(ref path) => ActualConnection::Unix(UnixConnection {
1037 sock: UnixStream::connect(path)?,
1038 open: true,
1039 }),
1040 #[cfg(not(unix))]
1041 ConnectionAddr::Unix(ref _path) => {
1042 fail!((
1043 ErrorKind::InvalidClientConfig,
1044 "Cannot connect to unix sockets \
1045 on this platform"
1046 ));
1047 }
1048 })
1049 }
1050
1051 pub fn send_bytes(&mut self, bytes: &[u8]) -> RedisResult<Value> {
1052 match *self {
1053 ActualConnection::Tcp(ref mut connection) => {
1054 let res = connection.reader.write_all(bytes).map_err(RedisError::from);
1055 match res {
1056 Err(e) => {
1057 if e.is_unrecoverable_error() {
1058 connection.open = false;
1059 }
1060 Err(e)
1061 }
1062 Ok(_) => Ok(Value::Okay),
1063 }
1064 }
1065 #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
1066 ActualConnection::TcpNativeTls(ref mut connection) => {
1067 let res = connection.reader.write_all(bytes).map_err(RedisError::from);
1068 match res {
1069 Err(e) => {
1070 if e.is_unrecoverable_error() {
1071 connection.open = false;
1072 }
1073 Err(e)
1074 }
1075 Ok(_) => Ok(Value::Okay),
1076 }
1077 }
1078 #[cfg(feature = "tls-rustls")]
1079 ActualConnection::TcpRustls(ref mut connection) => {
1080 let res = connection.reader.write_all(bytes).map_err(RedisError::from);
1081 match res {
1082 Err(e) => {
1083 if e.is_unrecoverable_error() {
1084 connection.open = false;
1085 }
1086 Err(e)
1087 }
1088 Ok(_) => Ok(Value::Okay),
1089 }
1090 }
1091 #[cfg(unix)]
1092 ActualConnection::Unix(ref mut connection) => {
1093 let result = connection.sock.write_all(bytes).map_err(RedisError::from);
1094 match result {
1095 Err(e) => {
1096 if e.is_unrecoverable_error() {
1097 connection.open = false;
1098 }
1099 Err(e)
1100 }
1101 Ok(_) => Ok(Value::Okay),
1102 }
1103 }
1104 }
1105 }
1106
1107 pub fn set_write_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
1108 match *self {
1109 ActualConnection::Tcp(TcpConnection { ref reader, .. }) => {
1110 reader.set_write_timeout(dur)?;
1111 }
1112 #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
1113 ActualConnection::TcpNativeTls(ref boxed_tls_connection) => {
1114 let reader = &(boxed_tls_connection.reader);
1115 reader.get_ref().set_write_timeout(dur)?;
1116 }
1117 #[cfg(feature = "tls-rustls")]
1118 ActualConnection::TcpRustls(ref boxed_tls_connection) => {
1119 let reader = &(boxed_tls_connection.reader);
1120 reader.get_ref().set_write_timeout(dur)?;
1121 }
1122 #[cfg(unix)]
1123 ActualConnection::Unix(UnixConnection { ref sock, .. }) => {
1124 sock.set_write_timeout(dur)?;
1125 }
1126 }
1127 Ok(())
1128 }
1129
1130 pub fn set_read_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
1131 match *self {
1132 ActualConnection::Tcp(TcpConnection { ref reader, .. }) => {
1133 reader.set_read_timeout(dur)?;
1134 }
1135 #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
1136 ActualConnection::TcpNativeTls(ref boxed_tls_connection) => {
1137 let reader = &(boxed_tls_connection.reader);
1138 reader.get_ref().set_read_timeout(dur)?;
1139 }
1140 #[cfg(feature = "tls-rustls")]
1141 ActualConnection::TcpRustls(ref boxed_tls_connection) => {
1142 let reader = &(boxed_tls_connection.reader);
1143 reader.get_ref().set_read_timeout(dur)?;
1144 }
1145 #[cfg(unix)]
1146 ActualConnection::Unix(UnixConnection { ref sock, .. }) => {
1147 sock.set_read_timeout(dur)?;
1148 }
1149 }
1150 Ok(())
1151 }
1152
1153 pub fn is_open(&self) -> bool {
1154 match *self {
1155 ActualConnection::Tcp(TcpConnection { open, .. }) => open,
1156 #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
1157 ActualConnection::TcpNativeTls(ref boxed_tls_connection) => boxed_tls_connection.open,
1158 #[cfg(feature = "tls-rustls")]
1159 ActualConnection::TcpRustls(ref boxed_tls_connection) => boxed_tls_connection.open,
1160 #[cfg(unix)]
1161 ActualConnection::Unix(UnixConnection { open, .. }) => open,
1162 }
1163 }
1164}
1165
1166#[cfg(feature = "tls-rustls")]
1167pub(crate) fn create_rustls_config(
1168 insecure: bool,
1169 tls_params: Option<TlsConnParams>,
1170) -> RedisResult<rustls::ClientConfig> {
1171 #[allow(unused_mut)]
1172 let mut root_store = RootCertStore::empty();
1173 #[cfg(feature = "tls-rustls-webpki-roots")]
1174 root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
1175 #[cfg(all(
1176 feature = "tls-rustls",
1177 not(feature = "tls-native-tls"),
1178 not(feature = "tls-rustls-webpki-roots")
1179 ))]
1180 {
1181 let mut certificate_result = load_native_certs();
1182 if let Some(error) = certificate_result.errors.pop() {
1183 return Err(error.into());
1184 }
1185 for cert in certificate_result.certs {
1186 root_store.add(cert)?;
1187 }
1188 }
1189
1190 let config = rustls::ClientConfig::builder();
1191 let config = if let Some(tls_params) = tls_params {
1192 let root_cert_store = tls_params.root_cert_store.unwrap_or(root_store);
1193 let config_builder = config.with_root_certificates(root_cert_store.clone());
1194
1195 let config_builder = if let Some(ClientTlsParams {
1196 client_cert_chain: client_cert,
1197 client_key,
1198 }) = tls_params.client_tls_params
1199 {
1200 config_builder
1201 .with_client_auth_cert(client_cert, client_key)
1202 .map_err(|err| {
1203 RedisError::from((
1204 ErrorKind::InvalidClientConfig,
1205 "Unable to build client with TLS parameters provided.",
1206 err.to_string(),
1207 ))
1208 })?
1209 } else {
1210 config_builder.with_no_client_auth()
1211 };
1212
1213 #[cfg(any(feature = "tls-rustls-insecure", feature = "tls-native-tls"))]
1219 let config_builder = if !insecure && tls_params.danger_accept_invalid_hostnames {
1220 #[cfg(not(feature = "tls-rustls-insecure"))]
1221 {
1222 fail!((
1225 ErrorKind::InvalidClientConfig,
1226 "Cannot create insecure client via danger_accept_invalid_hostnames without tls-rustls-insecure feature"
1227 ));
1228 }
1229
1230 #[cfg(feature = "tls-rustls-insecure")]
1231 {
1232 let mut config = config_builder;
1233 config.dangerous().set_certificate_verifier(Arc::new(
1234 AcceptInvalidHostnamesCertVerifier {
1235 inner: rustls::client::WebPkiServerVerifier::builder(Arc::new(
1236 root_cert_store,
1237 ))
1238 .build()
1239 .map_err(|err| rustls::Error::from(rustls::OtherError(Arc::new(err))))?,
1240 },
1241 ));
1242 config
1243 }
1244 } else {
1245 config_builder
1246 };
1247
1248 config_builder
1249 } else {
1250 config
1251 .with_root_certificates(root_store)
1252 .with_no_client_auth()
1253 };
1254
1255 match (insecure, cfg!(feature = "tls-rustls-insecure")) {
1256 #[cfg(feature = "tls-rustls-insecure")]
1257 (true, true) => {
1258 let mut config = config;
1259 config.enable_sni = false;
1260 let Some(crypto_provider) = rustls::crypto::CryptoProvider::get_default() else {
1261 return Err(RedisError::from((
1262 ErrorKind::InvalidClientConfig,
1263 "No crypto provider available for rustls",
1264 )));
1265 };
1266 config
1267 .dangerous()
1268 .set_certificate_verifier(Arc::new(NoCertificateVerification {
1269 supported: crypto_provider.signature_verification_algorithms,
1270 }));
1271
1272 Ok(config)
1273 }
1274 (true, false) => {
1275 fail!((
1276 ErrorKind::InvalidClientConfig,
1277 "Cannot create insecure client without tls-rustls-insecure feature"
1278 ));
1279 }
1280 _ => Ok(config),
1281 }
1282}
1283
1284pub(crate) fn authenticate_cmd(username: Option<&str>, password: &str) -> Cmd {
1285 let mut command = cmd("AUTH");
1286
1287 if let Some(username) = &username {
1288 command.arg(username);
1289 }
1290
1291 command.arg(password);
1292 command
1293}
1294
1295pub fn connect(
1296 connection_info: &ConnectionInfo,
1297 timeout: Option<Duration>,
1298) -> RedisResult<Connection> {
1299 let start = Instant::now();
1300 let con: ActualConnection = ActualConnection::new(
1301 &connection_info.addr,
1302 timeout,
1303 &connection_info.tcp_settings,
1304 )?;
1305
1306 let remaining_timeout = timeout.and_then(|timeout| timeout.checked_sub(start.elapsed()));
1308 if timeout.is_some() && remaining_timeout.is_none() {
1310 return Err(RedisError::from(std::io::Error::new(
1311 std::io::ErrorKind::TimedOut,
1312 "Connection timed out",
1313 )));
1314 }
1315 con.set_read_timeout(remaining_timeout)?;
1316 con.set_write_timeout(remaining_timeout)?;
1317
1318 let con = setup_connection(
1319 con,
1320 &connection_info.redis,
1321 #[cfg(feature = "cache-aio")]
1322 None,
1323 )?;
1324
1325 con.set_read_timeout(None)?;
1327 con.set_write_timeout(None)?;
1328
1329 Ok(con)
1330}
1331
1332pub(crate) struct ConnectionSetupComponents {
1333 resp3_auth_cmd_idx: Option<usize>,
1334 resp2_auth_cmd_idx: Option<usize>,
1335 select_cmd_idx: Option<usize>,
1336 #[cfg(feature = "cache-aio")]
1337 cache_cmd_idx: Option<usize>,
1338}
1339
1340pub(crate) fn connection_setup_pipeline(
1341 connection_info: &RedisConnectionInfo,
1342 check_username: bool,
1343 #[cfg(feature = "cache-aio")] cache_config: Option<crate::caching::CacheConfig>,
1344) -> (crate::Pipeline, ConnectionSetupComponents) {
1345 let mut pipeline = pipe();
1346 let (authenticate_with_resp3_cmd_index, authenticate_with_resp2_cmd_index) =
1347 if connection_info.protocol.supports_resp3() {
1348 pipeline.add_command(resp3_hello(connection_info));
1349 (Some(0), None)
1350 } else if let Some(password) = connection_info.password.as_ref() {
1351 pipeline.add_command(authenticate_cmd(
1352 check_username.then(|| connection_info.username()).flatten(),
1353 password,
1354 ));
1355 (None, Some(0))
1356 } else {
1357 (None, None)
1358 };
1359
1360 let select_db_cmd_index = (connection_info.db != 0)
1361 .then(|| pipeline.len())
1362 .inspect(|_| {
1363 pipeline.cmd("SELECT").arg(connection_info.db);
1364 });
1365
1366 #[cfg(feature = "cache-aio")]
1367 let cache_cmd_index = cache_config.map(|cache_config| {
1368 pipeline.cmd("CLIENT").arg("TRACKING").arg("ON");
1369 match cache_config.mode {
1370 crate::caching::CacheMode::All => {}
1371 crate::caching::CacheMode::OptIn => {
1372 pipeline.arg("OPTIN");
1373 }
1374 }
1375 pipeline.len() - 1
1376 });
1377
1378 if !connection_info.skip_set_lib_name {
1381 pipeline
1382 .cmd("CLIENT")
1383 .arg("SETINFO")
1384 .arg("LIB-NAME")
1385 .arg(
1386 connection_info
1387 .lib_name
1388 .as_ref()
1389 .map_or(DEFAULT_CLIENT_SETINFO_LIB_NAME, ArcStr::as_str),
1390 )
1391 .ignore();
1392 pipeline
1393 .cmd("CLIENT")
1394 .arg("SETINFO")
1395 .arg("LIB-VER")
1396 .arg(
1397 connection_info
1398 .lib_ver
1399 .as_ref()
1400 .map_or(DEFAULT_CLIENT_SETINFO_LIB_VER, ArcStr::as_str),
1401 )
1402 .ignore();
1403 }
1404
1405 (
1406 pipeline,
1407 ConnectionSetupComponents {
1408 resp3_auth_cmd_idx: authenticate_with_resp3_cmd_index,
1409 resp2_auth_cmd_idx: authenticate_with_resp2_cmd_index,
1410 select_cmd_idx: select_db_cmd_index,
1411 #[cfg(feature = "cache-aio")]
1412 cache_cmd_idx: cache_cmd_index,
1413 },
1414 )
1415}
1416
1417fn check_resp3_auth(result: &Value) -> RedisResult<()> {
1418 if let Value::ServerError(err) = result {
1419 return Err(get_resp3_hello_command_error(err.clone().into()));
1420 }
1421 Ok(())
1422}
1423
1424#[derive(PartialEq)]
1425pub(crate) enum AuthResult {
1426 Succeeded,
1427 ShouldRetryWithoutUsername,
1428}
1429
1430fn check_resp2_auth(result: &Value) -> RedisResult<AuthResult> {
1431 let err = match result {
1432 Value::Okay => {
1433 return Ok(AuthResult::Succeeded);
1434 }
1435 Value::ServerError(err) => err,
1436 _ => {
1437 return Err((
1438 ServerErrorKind::ResponseError.into(),
1439 "Redis server refused to authenticate, returns Ok() != Value::Okay",
1440 )
1441 .into());
1442 }
1443 };
1444
1445 let err_msg = err.details().ok_or((
1446 ErrorKind::AuthenticationFailed,
1447 "Password authentication failed",
1448 ))?;
1449 if !err_msg.contains("wrong number of arguments for 'auth' command") {
1450 return Err((
1451 ErrorKind::AuthenticationFailed,
1452 "Password authentication failed",
1453 )
1454 .into());
1455 }
1456 Ok(AuthResult::ShouldRetryWithoutUsername)
1457}
1458
1459fn check_db_select(value: &Value) -> RedisResult<()> {
1460 let Value::ServerError(err) = value else {
1461 return Ok(());
1462 };
1463
1464 match err.details() {
1465 Some(err_msg) => Err((
1466 ServerErrorKind::ResponseError.into(),
1467 "Redis server refused to switch database",
1468 err_msg.to_string(),
1469 )
1470 .into()),
1471 None => Err((
1472 ServerErrorKind::ResponseError.into(),
1473 "Redis server refused to switch database",
1474 )
1475 .into()),
1476 }
1477}
1478
1479#[cfg(feature = "cache-aio")]
1480fn check_caching(result: &Value) -> RedisResult<()> {
1481 match result {
1482 Value::Okay => Ok(()),
1483 _ => Err((
1484 ServerErrorKind::ResponseError.into(),
1485 "Client-side caching returned unknown response",
1486 format!("{result:?}"),
1487 )
1488 .into()),
1489 }
1490}
1491
1492pub(crate) fn check_connection_setup(
1493 results: Vec<Value>,
1494 ConnectionSetupComponents {
1495 resp3_auth_cmd_idx,
1496 resp2_auth_cmd_idx,
1497 select_cmd_idx,
1498 #[cfg(feature = "cache-aio")]
1499 cache_cmd_idx,
1500 }: ConnectionSetupComponents,
1501) -> RedisResult<AuthResult> {
1502 assert!(!(resp2_auth_cmd_idx.is_some() && resp3_auth_cmd_idx.is_some()));
1504
1505 if let Some(index) = resp3_auth_cmd_idx {
1506 let Some(value) = results.get(index) else {
1507 return Err((ErrorKind::Client, "Missing RESP3 auth response").into());
1508 };
1509 check_resp3_auth(value)?;
1510 } else if let Some(index) = resp2_auth_cmd_idx {
1511 let Some(value) = results.get(index) else {
1512 return Err((ErrorKind::Client, "Missing RESP2 auth response").into());
1513 };
1514 if check_resp2_auth(value)? == AuthResult::ShouldRetryWithoutUsername {
1515 return Ok(AuthResult::ShouldRetryWithoutUsername);
1516 }
1517 }
1518
1519 if let Some(index) = select_cmd_idx {
1520 let Some(value) = results.get(index) else {
1521 return Err((ErrorKind::Client, "Missing SELECT DB response").into());
1522 };
1523 check_db_select(value)?;
1524 }
1525
1526 #[cfg(feature = "cache-aio")]
1527 if let Some(index) = cache_cmd_idx {
1528 let Some(value) = results.get(index) else {
1529 return Err((ErrorKind::Client, "Missing Caching response").into());
1530 };
1531 check_caching(value)?;
1532 }
1533
1534 Ok(AuthResult::Succeeded)
1535}
1536
1537fn execute_connection_pipeline(
1538 rv: &mut Connection,
1539 (pipeline, instructions): (crate::Pipeline, ConnectionSetupComponents),
1540) -> RedisResult<AuthResult> {
1541 if pipeline.is_empty() {
1542 return Ok(AuthResult::Succeeded);
1543 }
1544 let results = rv.req_packed_commands(&pipeline.get_packed_pipeline(), 0, pipeline.len())?;
1545
1546 check_connection_setup(results, instructions)
1547}
1548
1549fn setup_connection(
1550 con: ActualConnection,
1551 connection_info: &RedisConnectionInfo,
1552 #[cfg(feature = "cache-aio")] cache_config: Option<crate::caching::CacheConfig>,
1553) -> RedisResult<Connection> {
1554 let mut rv = Connection {
1555 con,
1556 parser: Parser::new(),
1557 db: connection_info.db,
1558 pubsub: false,
1559 protocol: connection_info.protocol,
1560 push_sender: None,
1561 messages_to_skip: 0,
1562 };
1563
1564 if execute_connection_pipeline(
1565 &mut rv,
1566 connection_setup_pipeline(
1567 connection_info,
1568 true,
1569 #[cfg(feature = "cache-aio")]
1570 cache_config,
1571 ),
1572 )? == AuthResult::ShouldRetryWithoutUsername
1573 {
1574 execute_connection_pipeline(
1575 &mut rv,
1576 connection_setup_pipeline(
1577 connection_info,
1578 false,
1579 #[cfg(feature = "cache-aio")]
1580 cache_config,
1581 ),
1582 )?;
1583 }
1584
1585 Ok(rv)
1586}
1587
1588pub trait ConnectionLike {
1600 fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value>;
1603
1604 #[doc(hidden)]
1612 fn req_packed_commands(
1613 &mut self,
1614 cmd: &[u8],
1615 offset: usize,
1616 count: usize,
1617 ) -> RedisResult<Vec<Value>>;
1618
1619 fn req_command(&mut self, cmd: &Cmd) -> RedisResult<Value> {
1621 let pcmd = cmd.get_packed_command();
1622 self.req_packed_command(&pcmd)
1623 }
1624
1625 fn get_db(&self) -> i64;
1630
1631 #[doc(hidden)]
1633 fn supports_pipelining(&self) -> bool {
1634 true
1635 }
1636
1637 fn check_connection(&mut self) -> bool;
1639
1640 fn is_open(&self) -> bool;
1648}
1649
1650impl Connection {
1658 pub fn send_packed_command(&mut self, cmd: &[u8]) -> RedisResult<()> {
1663 self.send_bytes(cmd)?;
1664 Ok(())
1665 }
1666
1667 pub fn recv_response(&mut self) -> RedisResult<Value> {
1670 self.read(true)
1671 }
1672
1673 pub fn set_write_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
1679 self.con.set_write_timeout(dur)
1680 }
1681
1682 pub fn set_read_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
1688 self.con.set_read_timeout(dur)
1689 }
1690
1691 pub fn as_pubsub(&mut self) -> PubSub<'_> {
1693 PubSub::new(self)
1697 }
1698
1699 fn exit_pubsub(&mut self) -> RedisResult<()> {
1700 let res = self.clear_active_subscriptions();
1701 if res.is_ok() {
1702 self.pubsub = false;
1703 } else {
1704 self.pubsub = true;
1706 }
1707
1708 res
1709 }
1710
1711 fn clear_active_subscriptions(&mut self) -> RedisResult<()> {
1716 {
1722 let unsubscribe = cmd("UNSUBSCRIBE").get_packed_command();
1724 let punsubscribe = cmd("PUNSUBSCRIBE").get_packed_command();
1725
1726 self.send_bytes(&unsubscribe)?;
1728 self.send_bytes(&punsubscribe)?;
1729 }
1730
1731 let mut received_unsub = false;
1737 let mut received_punsub = false;
1738
1739 loop {
1740 let resp = self.recv_response()?;
1741
1742 match resp {
1743 Value::Push { kind, data } => {
1744 if data.len() >= 2 {
1745 if let Value::Int(num) = data[1] {
1746 if resp3_is_pub_sub_state_cleared(
1747 &mut received_unsub,
1748 &mut received_punsub,
1749 &kind,
1750 num as isize,
1751 ) {
1752 break;
1753 }
1754 }
1755 }
1756 }
1757 Value::ServerError(err) => {
1758 if err.kind() == Some(ServerErrorKind::NoSub) {
1761 if no_sub_err_is_pub_sub_state_cleared(
1762 &mut received_unsub,
1763 &mut received_punsub,
1764 &err,
1765 ) {
1766 break;
1767 } else {
1768 continue;
1769 }
1770 }
1771
1772 return Err(err.into());
1773 }
1774 Value::Array(vec) => {
1775 let res: (Vec<u8>, (), isize) = from_redis_value(Value::Array(vec))?;
1776 if resp2_is_pub_sub_state_cleared(
1777 &mut received_unsub,
1778 &mut received_punsub,
1779 &res.0,
1780 res.2,
1781 ) {
1782 break;
1783 }
1784 }
1785 _ => {
1786 return Err((
1787 ErrorKind::Client,
1788 "Unexpected unsubscribe response",
1789 format!("{resp:?}"),
1790 )
1791 .into());
1792 }
1793 }
1794 }
1795
1796 Ok(())
1799 }
1800
1801 fn send_push(&self, push: PushInfo) {
1802 if let Some(sender) = &self.push_sender {
1803 let _ = sender.send(push);
1804 }
1805 }
1806
1807 fn try_send(&self, value: &RedisResult<Value>) {
1808 if let Ok(Value::Push { kind, data }) = value {
1809 self.send_push(PushInfo {
1810 kind: kind.clone(),
1811 data: data.clone(),
1812 });
1813 }
1814 }
1815
1816 fn send_disconnect(&self) {
1817 self.send_push(PushInfo::disconnect())
1818 }
1819
1820 fn close_connection(&mut self) {
1821 self.send_disconnect();
1823 match self.con {
1824 ActualConnection::Tcp(ref mut connection) => {
1825 let _ = connection.reader.shutdown(net::Shutdown::Both);
1826 connection.open = false;
1827 }
1828 #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
1829 ActualConnection::TcpNativeTls(ref mut connection) => {
1830 let _ = connection.reader.shutdown();
1831 connection.open = false;
1832 }
1833 #[cfg(feature = "tls-rustls")]
1834 ActualConnection::TcpRustls(ref mut connection) => {
1835 let _ = connection.reader.get_mut().shutdown(net::Shutdown::Both);
1836 connection.open = false;
1837 }
1838 #[cfg(unix)]
1839 ActualConnection::Unix(ref mut connection) => {
1840 let _ = connection.sock.shutdown(net::Shutdown::Both);
1841 connection.open = false;
1842 }
1843 }
1844 }
1845
1846 fn read(&mut self, is_response: bool) -> RedisResult<Value> {
1849 loop {
1850 let result = match self.con {
1851 ActualConnection::Tcp(TcpConnection { ref mut reader, .. }) => {
1852 self.parser.parse_value(reader)
1853 }
1854 #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))]
1855 ActualConnection::TcpNativeTls(ref mut boxed_tls_connection) => {
1856 let reader = &mut boxed_tls_connection.reader;
1857 self.parser.parse_value(reader)
1858 }
1859 #[cfg(feature = "tls-rustls")]
1860 ActualConnection::TcpRustls(ref mut boxed_tls_connection) => {
1861 let reader = &mut boxed_tls_connection.reader;
1862 self.parser.parse_value(reader)
1863 }
1864 #[cfg(unix)]
1865 ActualConnection::Unix(UnixConnection { ref mut sock, .. }) => {
1866 self.parser.parse_value(sock)
1867 }
1868 };
1869 self.try_send(&result);
1870
1871 let Err(err) = &result else {
1872 if self.messages_to_skip > 0 {
1873 self.messages_to_skip -= 1;
1874 continue;
1875 }
1876 return result;
1877 };
1878 let Some(io_error) = err.as_io_error() else {
1879 if self.messages_to_skip > 0 {
1880 self.messages_to_skip -= 1;
1881 continue;
1882 }
1883 return result;
1884 };
1885 if io_error.kind() == io::ErrorKind::UnexpectedEof {
1887 self.close_connection();
1888 } else if is_response {
1889 self.messages_to_skip += 1;
1890 }
1891
1892 return result;
1893 }
1894 }
1895
1896 pub fn set_push_sender(&mut self, sender: SyncPushSender) {
1898 self.push_sender = Some(sender);
1899 }
1900
1901 fn send_bytes(&mut self, bytes: &[u8]) -> RedisResult<Value> {
1902 if bytes.is_empty() {
1903 return Err(RedisError::make_empty_command());
1904 }
1905 let result = self.con.send_bytes(bytes);
1906 if self.protocol.supports_resp3() {
1907 if let Err(e) = &result {
1908 if e.is_connection_dropped() {
1909 self.send_disconnect();
1910 }
1911 }
1912 }
1913 result
1914 }
1915
1916 pub fn subscribe_resp3<T: ToRedisArgs>(&mut self, channel: T) -> RedisResult<()> {
1920 check_resp3!(self.protocol);
1921 cmd("SUBSCRIBE")
1922 .arg(channel)
1923 .set_no_response(true)
1924 .exec(self)
1925 }
1926
1927 pub fn psubscribe_resp3<T: ToRedisArgs>(&mut self, pchannel: T) -> RedisResult<()> {
1931 check_resp3!(self.protocol);
1932 cmd("PSUBSCRIBE")
1933 .arg(pchannel)
1934 .set_no_response(true)
1935 .exec(self)
1936 }
1937
1938 pub fn unsubscribe_resp3<T: ToRedisArgs>(&mut self, channel: T) -> RedisResult<()> {
1942 check_resp3!(self.protocol);
1943 cmd("UNSUBSCRIBE")
1944 .arg(channel)
1945 .set_no_response(true)
1946 .exec(self)
1947 }
1948
1949 pub fn punsubscribe_resp3<T: ToRedisArgs>(&mut self, pchannel: T) -> RedisResult<()> {
1953 check_resp3!(self.protocol);
1954 cmd("PUNSUBSCRIBE")
1955 .arg(pchannel)
1956 .set_no_response(true)
1957 .exec(self)
1958 }
1959}
1960
1961impl ConnectionLike for Connection {
1962 fn req_command(&mut self, cmd: &Cmd) -> RedisResult<Value> {
1964 let pcmd = cmd.get_packed_command();
1965 if self.pubsub {
1966 self.exit_pubsub()?;
1967 }
1968
1969 self.send_bytes(&pcmd)?;
1970 if cmd.is_no_response() {
1971 return Ok(Value::Nil);
1972 }
1973 loop {
1974 match self.read(true)? {
1975 Value::Push {
1976 kind: _kind,
1977 data: _data,
1978 } => continue,
1979 val => return Ok(val),
1980 }
1981 }
1982 }
1983 fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
1984 if self.pubsub {
1985 self.exit_pubsub()?;
1986 }
1987
1988 self.send_bytes(cmd)?;
1989 loop {
1990 match self.read(true)? {
1991 Value::Push {
1992 kind: _kind,
1993 data: _data,
1994 } => continue,
1995 val => return Ok(val),
1996 }
1997 }
1998 }
1999
2000 fn req_packed_commands(
2001 &mut self,
2002 cmd: &[u8],
2003 offset: usize,
2004 count: usize,
2005 ) -> RedisResult<Vec<Value>> {
2006 if self.pubsub {
2007 self.exit_pubsub()?;
2008 }
2009 self.send_bytes(cmd)?;
2010 let mut rv = vec![];
2011 let mut first_err = None;
2012 let mut server_errors = vec![];
2013 let mut count = count;
2014 let mut idx = 0;
2015 while idx < (offset + count) {
2016 let response = self.read(true);
2021 match response {
2022 Ok(Value::ServerError(err)) => {
2023 if idx < offset {
2024 server_errors.push((idx - 1, err)); } else {
2026 rv.push(Value::ServerError(err));
2027 }
2028 }
2029 Ok(item) => {
2030 if let Value::Push {
2032 kind: _kind,
2033 data: _data,
2034 } = item
2035 {
2036 count += 1;
2038 } else if idx >= offset {
2039 rv.push(item);
2040 }
2041 }
2042 Err(err) => {
2043 if first_err.is_none() {
2044 first_err = Some(err);
2045 }
2046 }
2047 }
2048 idx += 1;
2049 }
2050
2051 if !server_errors.is_empty() {
2052 return Err(RedisError::make_aborted_transaction(server_errors));
2053 }
2054
2055 first_err.map_or(Ok(rv), Err)
2056 }
2057
2058 fn get_db(&self) -> i64 {
2059 self.db
2060 }
2061
2062 fn check_connection(&mut self) -> bool {
2063 cmd("PING").query::<String>(self).is_ok()
2064 }
2065
2066 fn is_open(&self) -> bool {
2067 self.con.is_open()
2068 }
2069}
2070
2071impl<C, T> ConnectionLike for T
2072where
2073 C: ConnectionLike,
2074 T: DerefMut<Target = C>,
2075{
2076 fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
2077 self.deref_mut().req_packed_command(cmd)
2078 }
2079
2080 fn req_packed_commands(
2081 &mut self,
2082 cmd: &[u8],
2083 offset: usize,
2084 count: usize,
2085 ) -> RedisResult<Vec<Value>> {
2086 self.deref_mut().req_packed_commands(cmd, offset, count)
2087 }
2088
2089 fn req_command(&mut self, cmd: &Cmd) -> RedisResult<Value> {
2090 self.deref_mut().req_command(cmd)
2091 }
2092
2093 fn get_db(&self) -> i64 {
2094 self.deref().get_db()
2095 }
2096
2097 fn supports_pipelining(&self) -> bool {
2098 self.deref().supports_pipelining()
2099 }
2100
2101 fn check_connection(&mut self) -> bool {
2102 self.deref_mut().check_connection()
2103 }
2104
2105 fn is_open(&self) -> bool {
2106 self.deref().is_open()
2107 }
2108}
2109
2110impl<'a> PubSub<'a> {
2132 fn new(con: &'a mut Connection) -> Self {
2133 Self {
2134 con,
2135 waiting_messages: VecDeque::new(),
2136 }
2137 }
2138
2139 fn cache_messages_until_received_response(
2140 &mut self,
2141 cmd: &mut Cmd,
2142 is_sub_unsub: bool,
2143 ) -> RedisResult<Value> {
2144 let ignore_response = self.con.protocol.supports_resp3() && is_sub_unsub;
2145 cmd.set_no_response(ignore_response);
2146
2147 self.con.send_packed_command(&cmd.get_packed_command())?;
2148
2149 loop {
2150 let response = self.con.recv_response()?;
2151 if let Some(msg) = Msg::from_value(&response) {
2152 self.waiting_messages.push_back(msg);
2153 } else {
2154 return Ok(response);
2155 }
2156 }
2157 }
2158
2159 pub fn subscribe<T: ToRedisArgs>(&mut self, channel: T) -> RedisResult<()> {
2161 self.cache_messages_until_received_response(cmd("SUBSCRIBE").arg(channel), true)?;
2162 Ok(())
2163 }
2164
2165 pub fn psubscribe<T: ToRedisArgs>(&mut self, pchannel: T) -> RedisResult<()> {
2167 self.cache_messages_until_received_response(cmd("PSUBSCRIBE").arg(pchannel), true)?;
2168 Ok(())
2169 }
2170
2171 pub fn unsubscribe<T: ToRedisArgs>(&mut self, channel: T) -> RedisResult<()> {
2173 self.cache_messages_until_received_response(cmd("UNSUBSCRIBE").arg(channel), true)?;
2174 Ok(())
2175 }
2176
2177 pub fn punsubscribe<T: ToRedisArgs>(&mut self, pchannel: T) -> RedisResult<()> {
2179 self.cache_messages_until_received_response(cmd("PUNSUBSCRIBE").arg(pchannel), true)?;
2180 Ok(())
2181 }
2182
2183 pub fn ping_message<T: FromRedisValue>(&mut self, message: impl ToRedisArgs) -> RedisResult<T> {
2185 Ok(from_redis_value(
2186 self.cache_messages_until_received_response(cmd("PING").arg(message), false)?,
2187 )?)
2188 }
2189 pub fn ping<T: FromRedisValue>(&mut self) -> RedisResult<T> {
2191 Ok(from_redis_value(
2192 self.cache_messages_until_received_response(&mut cmd("PING"), false)?,
2193 )?)
2194 }
2195
2196 pub fn get_message(&mut self) -> RedisResult<Msg> {
2203 if let Some(msg) = self.waiting_messages.pop_front() {
2204 return Ok(msg);
2205 }
2206 loop {
2207 if let Some(msg) = Msg::from_owned_value(self.con.read(false)?) {
2208 return Ok(msg);
2209 } else {
2210 continue;
2211 }
2212 }
2213 }
2214
2215 pub fn set_read_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
2221 self.con.set_read_timeout(dur)
2222 }
2223}
2224
2225impl Drop for PubSub<'_> {
2226 fn drop(&mut self) {
2227 let _ = self.con.exit_pubsub();
2228 }
2229}
2230
2231impl Msg {
2234 pub fn from_value(value: &Value) -> Option<Self> {
2236 Self::from_owned_value(value.clone())
2237 }
2238
2239 pub fn from_owned_value(value: Value) -> Option<Self> {
2241 let mut pattern = None;
2242 let payload;
2243 let channel;
2244
2245 if let Value::Push { kind, data } = value {
2246 return Self::from_push_info(PushInfo { kind, data });
2247 } else {
2248 let raw_msg: Vec<Value> = from_redis_value(value).ok()?;
2249 let mut iter = raw_msg.into_iter();
2250 let msg_type: String = from_redis_value(iter.next()?).ok()?;
2251 if msg_type == "message" {
2252 channel = iter.next()?;
2253 payload = iter.next()?;
2254 } else if msg_type == "pmessage" {
2255 pattern = Some(iter.next()?);
2256 channel = iter.next()?;
2257 payload = iter.next()?;
2258 } else {
2259 return None;
2260 }
2261 };
2262 Some(Msg {
2263 payload,
2264 channel,
2265 pattern,
2266 })
2267 }
2268
2269 pub fn from_push_info(push_info: PushInfo) -> Option<Self> {
2271 let mut pattern = None;
2272 let payload;
2273 let channel;
2274
2275 let mut iter = push_info.data.into_iter();
2276 if push_info.kind == PushKind::Message || push_info.kind == PushKind::SMessage {
2277 channel = iter.next()?;
2278 payload = iter.next()?;
2279 } else if push_info.kind == PushKind::PMessage {
2280 pattern = Some(iter.next()?);
2281 channel = iter.next()?;
2282 payload = iter.next()?;
2283 } else {
2284 return None;
2285 }
2286
2287 Some(Msg {
2288 payload,
2289 channel,
2290 pattern,
2291 })
2292 }
2293
2294 pub fn get_channel<T: FromRedisValue>(&self) -> RedisResult<T> {
2296 Ok(from_redis_value_ref(&self.channel)?)
2297 }
2298
2299 pub fn get_channel_name(&self) -> &str {
2304 match self.channel {
2305 Value::BulkString(ref bytes) => from_utf8(bytes).unwrap_or("?"),
2306 _ => "?",
2307 }
2308 }
2309
2310 pub fn get_payload<T: FromRedisValue>(&self) -> RedisResult<T> {
2312 Ok(from_redis_value_ref(&self.payload)?)
2313 }
2314
2315 pub fn get_payload_bytes(&self) -> &[u8] {
2319 match self.payload {
2320 Value::BulkString(ref bytes) => bytes,
2321 _ => b"",
2322 }
2323 }
2324
2325 #[allow(clippy::wrong_self_convention)]
2328 pub fn from_pattern(&self) -> bool {
2329 self.pattern.is_some()
2330 }
2331
2332 pub fn get_pattern<T: FromRedisValue>(&self) -> RedisResult<T> {
2337 Ok(match self.pattern {
2338 None => from_redis_value_ref(&Value::Nil),
2339 Some(ref x) => from_redis_value_ref(x),
2340 }?)
2341 }
2342}
2343
2344pub fn transaction<
2377 C: ConnectionLike,
2378 K: ToRedisArgs,
2379 T,
2380 F: FnMut(&mut C, &mut Pipeline) -> RedisResult<Option<T>>,
2381>(
2382 con: &mut C,
2383 keys: &[K],
2384 func: F,
2385) -> RedisResult<T> {
2386 let mut func = func;
2387 loop {
2388 cmd("WATCH").arg(keys).exec(con)?;
2389 let mut p = pipe();
2390 let response: Option<T> = func(con, p.atomic())?;
2391 match response {
2392 None => {
2393 continue;
2394 }
2395 Some(response) => {
2396 cmd("UNWATCH").exec(con)?;
2399 return Ok(response);
2400 }
2401 }
2402 }
2403}
2404pub fn resp2_is_pub_sub_state_cleared(
2408 received_unsub: &mut bool,
2409 received_punsub: &mut bool,
2410 kind: &[u8],
2411 num: isize,
2412) -> bool {
2413 match kind.first() {
2414 Some(&b'u') => *received_unsub = true,
2415 Some(&b'p') => *received_punsub = true,
2416 _ => (),
2417 };
2418 *received_unsub && *received_punsub && num == 0
2419}
2420
2421pub fn resp3_is_pub_sub_state_cleared(
2423 received_unsub: &mut bool,
2424 received_punsub: &mut bool,
2425 kind: &PushKind,
2426 num: isize,
2427) -> bool {
2428 match kind {
2429 PushKind::Unsubscribe => *received_unsub = true,
2430 PushKind::PUnsubscribe => *received_punsub = true,
2431 _ => (),
2432 };
2433 *received_unsub && *received_punsub && num == 0
2434}
2435
2436pub fn no_sub_err_is_pub_sub_state_cleared(
2437 received_unsub: &mut bool,
2438 received_punsub: &mut bool,
2439 err: &ServerError,
2440) -> bool {
2441 let details = err.details();
2442 *received_unsub = *received_unsub
2443 || details
2444 .map(|details| details.starts_with("'unsub"))
2445 .unwrap_or_default();
2446 *received_punsub = *received_punsub
2447 || details
2448 .map(|details| details.starts_with("'punsub"))
2449 .unwrap_or_default();
2450 *received_unsub && *received_punsub
2451}
2452
2453pub fn get_resp3_hello_command_error(err: RedisError) -> RedisError {
2455 if let Some(detail) = err.detail() {
2456 if detail.starts_with("unknown command `HELLO`") {
2457 return (
2458 ErrorKind::RESP3NotSupported,
2459 "Redis Server doesn't support HELLO command therefore resp3 cannot be used",
2460 )
2461 .into();
2462 }
2463 }
2464 err
2465}
2466
2467#[cfg(test)]
2468mod tests {
2469 mod util {
2470 use crate::connection::connection_setup_pipeline;
2471 use crate::{RedisConnectionInfo, cmd};
2472
2473 pub fn assert_lib_name_in_connection_setup_pipeline(
2475 redis_connection_info: &RedisConnectionInfo,
2476 expected_lib_name: &str,
2477 expected_lib_ver: &str,
2478 ) {
2479 let pipeline = connection_setup_pipeline(
2481 redis_connection_info,
2482 false,
2483 #[cfg(feature = "cache-aio")]
2484 None,
2485 )
2486 .0;
2487
2488 let actual_packed_cmds = pipeline
2489 .commands
2490 .iter()
2491 .map(|c| c.get_packed_command())
2492 .collect::<Vec<_>>();
2493
2494 let expected_lib_name_packed_cmd = cmd("CLIENT")
2495 .arg("SETINFO")
2496 .arg("LIB-NAME")
2497 .arg(expected_lib_name)
2498 .get_packed_command();
2499 assert!(actual_packed_cmds.contains(&expected_lib_name_packed_cmd));
2500
2501 let expected_lib_ver_packed_cmd = cmd("CLIENT")
2502 .arg("SETINFO")
2503 .arg("LIB-VER")
2504 .arg(expected_lib_ver)
2505 .get_packed_command();
2506 assert!(actual_packed_cmds.contains(&expected_lib_ver_packed_cmd));
2507 }
2508 }
2509
2510 use super::*;
2511 use util::assert_lib_name_in_connection_setup_pipeline;
2512
2513 #[test]
2514 fn test_parse_redis_url() {
2515 let cases = vec![
2516 ("redis://127.0.0.1", true),
2517 ("redis://[::1]", true),
2518 ("rediss://127.0.0.1", true),
2519 ("rediss://[::1]", true),
2520 ("valkey://127.0.0.1", true),
2521 ("valkey://[::1]", true),
2522 ("valkeys://127.0.0.1", true),
2523 ("valkeys://[::1]", true),
2524 ("redis+unix:///run/redis.sock", true),
2525 ("valkey+unix:///run/valkey.sock", true),
2526 ("unix:///run/redis.sock", true),
2527 ("http://127.0.0.1", false),
2528 ("tcp://127.0.0.1", false),
2529 ];
2530 for (url, expected) in cases.into_iter() {
2531 let res = parse_redis_url(url);
2532 assert_eq!(
2533 res.is_some(),
2534 expected,
2535 "Parsed result of `{url}` is not expected",
2536 );
2537 }
2538 }
2539
2540 #[test]
2541 fn test_url_to_tcp_connection_info() {
2542 let cases = vec![
2543 (
2544 url::Url::parse("redis://127.0.0.1").unwrap(),
2545 ConnectionInfo {
2546 addr: ConnectionAddr::Tcp("127.0.0.1".to_string(), 6379),
2547 redis: Default::default(),
2548 tcp_settings: TcpSettings::default(),
2549 },
2550 ),
2551 (
2552 url::Url::parse("redis://[::1]").unwrap(),
2553 ConnectionInfo {
2554 addr: ConnectionAddr::Tcp("::1".to_string(), 6379),
2555 redis: Default::default(),
2556 tcp_settings: TcpSettings::default(),
2557 },
2558 ),
2559 (
2560 url::Url::parse("redis://%25johndoe%25:%23%40%3C%3E%24@example.com/2").unwrap(),
2561 ConnectionInfo {
2562 addr: ConnectionAddr::Tcp("example.com".to_string(), 6379),
2563 redis: RedisConnectionInfo {
2564 db: 2,
2565 username: Some("%johndoe%".into()),
2566 password: Some("#@<>$".into()),
2567 protocol: ProtocolVersion::RESP2,
2568 skip_set_lib_name: false,
2569 lib_name: None,
2570 lib_ver: None,
2571 },
2572 tcp_settings: TcpSettings::default(),
2573 },
2574 ),
2575 (
2576 url::Url::parse("redis://127.0.0.1/?protocol=2").unwrap(),
2577 ConnectionInfo {
2578 addr: ConnectionAddr::Tcp("127.0.0.1".to_string(), 6379),
2579 redis: Default::default(),
2580 tcp_settings: TcpSettings::default(),
2581 },
2582 ),
2583 (
2584 url::Url::parse("redis://127.0.0.1/?protocol=resp3").unwrap(),
2585 ConnectionInfo {
2586 addr: ConnectionAddr::Tcp("127.0.0.1".to_string(), 6379),
2587 redis: RedisConnectionInfo {
2588 db: 0,
2589 username: None,
2590 password: None,
2591 protocol: ProtocolVersion::RESP3,
2592 skip_set_lib_name: false,
2593 lib_name: None,
2594 lib_ver: None,
2595 },
2596 tcp_settings: TcpSettings::default(),
2597 },
2598 ),
2599 ];
2600 for (url, expected) in cases.into_iter() {
2601 let res = url_to_tcp_connection_info(url.clone()).unwrap();
2602 assert_eq!(res.addr, expected.addr, "addr of {url} is not expected");
2603 assert_eq!(
2604 res.redis.db, expected.redis.db,
2605 "db of {url} is not expected",
2606 );
2607 assert_eq!(
2608 res.redis.username, expected.redis.username,
2609 "username of {url} is not expected",
2610 );
2611 assert_eq!(
2612 res.redis.password, expected.redis.password,
2613 "password of {url} is not expected",
2614 );
2615 }
2616 }
2617
2618 #[test]
2619 fn test_url_to_tcp_connection_info_failed() {
2620 let cases = vec![
2621 (
2622 url::Url::parse("redis://").unwrap(),
2623 "Missing hostname",
2624 None,
2625 ),
2626 (
2627 url::Url::parse("redis://127.0.0.1/db").unwrap(),
2628 "Invalid database number",
2629 None,
2630 ),
2631 (
2632 url::Url::parse("redis://C3%B0@127.0.0.1").unwrap(),
2633 "Username is not valid UTF-8 string",
2634 None,
2635 ),
2636 (
2637 url::Url::parse("redis://:C3%B0@127.0.0.1").unwrap(),
2638 "Password is not valid UTF-8 string",
2639 None,
2640 ),
2641 (
2642 url::Url::parse("redis://127.0.0.1/?protocol=4").unwrap(),
2643 "Invalid protocol version",
2644 Some("4"),
2645 ),
2646 ];
2647 for (url, expected, detail) in cases.into_iter() {
2648 let res = url_to_tcp_connection_info(url).unwrap_err();
2649 assert_eq!(res.kind(), crate::ErrorKind::InvalidClientConfig,);
2650 let desc = res.to_string();
2651 assert!(desc.contains(expected), "{desc}");
2652 assert_eq!(res.detail(), detail);
2653 }
2654 }
2655
2656 #[test]
2657 #[cfg(unix)]
2658 fn test_url_to_unix_connection_info() {
2659 let cases = vec![
2660 (
2661 url::Url::parse("unix:///var/run/redis.sock").unwrap(),
2662 ConnectionInfo {
2663 addr: ConnectionAddr::Unix("/var/run/redis.sock".into()),
2664 redis: RedisConnectionInfo {
2665 db: 0,
2666 username: None,
2667 password: None,
2668 protocol: ProtocolVersion::RESP2,
2669 skip_set_lib_name: false,
2670 lib_name: None,
2671 lib_ver: None,
2672 },
2673 tcp_settings: Default::default(),
2674 },
2675 ),
2676 (
2677 url::Url::parse("redis+unix:///var/run/redis.sock?db=1").unwrap(),
2678 ConnectionInfo {
2679 addr: ConnectionAddr::Unix("/var/run/redis.sock".into()),
2680 redis: RedisConnectionInfo {
2681 db: 1,
2682 username: None,
2683 password: None,
2684 protocol: ProtocolVersion::RESP2,
2685 skip_set_lib_name: false,
2686 lib_name: None,
2687 lib_ver: None,
2688 },
2689 tcp_settings: TcpSettings::default(),
2690 },
2691 ),
2692 (
2693 url::Url::parse(
2694 "unix:///example.sock?user=%25johndoe%25&pass=%23%40%3C%3E%24&db=2",
2695 )
2696 .unwrap(),
2697 ConnectionInfo {
2698 addr: ConnectionAddr::Unix("/example.sock".into()),
2699 redis: RedisConnectionInfo {
2700 db: 2,
2701 username: Some("%johndoe%".into()),
2702 password: Some("#@<>$".into()),
2703 protocol: ProtocolVersion::RESP2,
2704 skip_set_lib_name: false,
2705 lib_name: None,
2706 lib_ver: None,
2707 },
2708 tcp_settings: TcpSettings::default(),
2709 },
2710 ),
2711 (
2712 url::Url::parse(
2713 "redis+unix:///example.sock?pass=%26%3F%3D+%2A%2B&db=2&user=%25johndoe%25",
2714 )
2715 .unwrap(),
2716 ConnectionInfo {
2717 addr: ConnectionAddr::Unix("/example.sock".into()),
2718 redis: RedisConnectionInfo {
2719 db: 2,
2720 username: Some("%johndoe%".into()),
2721 password: Some("&?= *+".into()),
2722 protocol: ProtocolVersion::RESP2,
2723 skip_set_lib_name: false,
2724 lib_name: None,
2725 lib_ver: None,
2726 },
2727 tcp_settings: TcpSettings::default(),
2728 },
2729 ),
2730 (
2731 url::Url::parse("redis+unix:///var/run/redis.sock?protocol=3").unwrap(),
2732 ConnectionInfo {
2733 addr: ConnectionAddr::Unix("/var/run/redis.sock".into()),
2734 redis: RedisConnectionInfo {
2735 db: 0,
2736 username: None,
2737 password: None,
2738 protocol: ProtocolVersion::RESP3,
2739 skip_set_lib_name: false,
2740 lib_name: None,
2741 lib_ver: None,
2742 },
2743 tcp_settings: TcpSettings::default(),
2744 },
2745 ),
2746 ];
2747 for (url, expected) in cases.into_iter() {
2748 assert_eq!(
2749 ConnectionAddr::Unix(url.to_file_path().unwrap()),
2750 expected.addr,
2751 "addr of {url} is not expected",
2752 );
2753 let res = url_to_unix_connection_info(url.clone()).unwrap();
2754 assert_eq!(res.addr, expected.addr, "addr of {url} is not expected");
2755 assert_eq!(
2756 res.redis.db, expected.redis.db,
2757 "db of {url} is not expected",
2758 );
2759 assert_eq!(
2760 res.redis.username, expected.redis.username,
2761 "username of {url} is not expected",
2762 );
2763 assert_eq!(
2764 res.redis.password, expected.redis.password,
2765 "password of {url} is not expected",
2766 );
2767 }
2768 }
2769
2770 #[test]
2771 fn redis_connection_info_lib_name_default() {
2772 let redis_connection_info = RedisConnectionInfo::default();
2773
2774 assert_eq!(redis_connection_info.lib_name(), None);
2776 assert_eq!(redis_connection_info.lib_ver(), None);
2777
2778 assert_lib_name_in_connection_setup_pipeline(
2780 &redis_connection_info,
2781 DEFAULT_CLIENT_SETINFO_LIB_NAME,
2782 DEFAULT_CLIENT_SETINFO_LIB_VER,
2783 );
2784 }
2785
2786 #[test]
2787 fn redis_connection_info_lib_name_custom() {
2788 let mut redis_connection_info = RedisConnectionInfo::default();
2789
2790 redis_connection_info = redis_connection_info.set_skip_set_lib_name();
2792 assert!(redis_connection_info.skip_set_lib_name());
2793
2794 redis_connection_info = redis_connection_info.set_lib_name("foo", "42.4711");
2796
2797 assert!(!redis_connection_info.skip_set_lib_name());
2799 assert_eq!(redis_connection_info.lib_name(), Some("foo"));
2800 assert_eq!(redis_connection_info.lib_ver(), Some("42.4711"));
2801
2802 assert_lib_name_in_connection_setup_pipeline(&redis_connection_info, "foo", "42.4711");
2804 }
2805}