redis/cluster_async/
mod.rs

1//! This module provides async functionality for connecting to Redis / Valkey Clusters.
2//!
3//! The cluster connection is meant to abstract the fact that a cluster is composed of multiple nodes,
4//! and to provide an API which is as close as possible to that of a single node connection. In order to do that,
5//! the cluster connection maintains connections to each node in the Redis/ Valkey cluster, and can route
6//! requests automatically to the relevant nodes. In cases that the cluster connection receives indications
7//! that the cluster topology has changed, it will query nodes in order to find the current cluster topology.
8//! If it disconnects from some nodes, it will automatically reconnect to those nodes.
9//!
10//! By default, [`ClusterConnection`] makes use of [`MultiplexedConnection`] and maintains a pool
11//! of connections to each node in the cluster.
12//!
13//! # Example
14//! ```rust,no_run
15//! use redis::cluster::ClusterClient;
16//! use redis::AsyncCommands;
17//!
18//! async fn fetch_an_integer() -> String {
19//!     let nodes = vec!["redis://127.0.0.1/"];
20//!     let client = ClusterClient::new(nodes).unwrap();
21//!     let mut connection = client.get_async_connection().await.unwrap();
22//!     let _: () = connection.set("test", "test_data").await.unwrap();
23//!     let rv: String = connection.get("test").await.unwrap();
24//!     return rv;
25//! }
26//! ```
27//!
28//! # Pipelining
29//! ```rust,no_run
30//! use redis::cluster::ClusterClient;
31//! use redis::{Value, AsyncCommands};
32//!
33//! async fn fetch_an_integer() -> redis::RedisResult<()> {
34//!     let nodes = vec!["redis://127.0.0.1/"];
35//!     let client = ClusterClient::new(nodes).unwrap();
36//!     let mut connection = client.get_async_connection().await.unwrap();
37//!     let key = "test";
38//!
39//!     redis::pipe()
40//!         .rpush(key, "123").ignore()
41//!         .ltrim(key, -10, -1).ignore()
42//!         .expire(key, 60).ignore()
43//!         .exec_async(&mut connection).await
44//! }
45//! ```
46//!
47//! # Pubsub
48//!
49//! Pubsub, and generally receiving push messages from the cluster nodes, is now supported
50//! when defining a connection with [crate::ProtocolVersion::RESP3] and some
51//! [crate::aio::AsyncPushSender] to receive the messages on.
52//!
53//! ```rust,no_run
54//! use redis::cluster::ClusterClientBuilder;
55//! use redis::{Value, AsyncCommands};
56//!
57//! async fn fetch_an_integer() -> redis::RedisResult<()> {
58//!     let nodes = vec!["redis://127.0.0.1/?protocol=3"];
59//!     let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
60//!     let client = ClusterClientBuilder::new(nodes)
61//!         .use_protocol(redis::ProtocolVersion::RESP3)
62//!         .push_sender(tx).build()?;
63//!     let mut connection = client.get_async_connection().await?;
64//!     connection.subscribe("channel").await?;
65//!     while let Some(msg) = rx.recv().await {
66//!         println!("Got: {:?}", msg);
67//!     }
68//!     Ok(())
69//! }
70//! ```
71//!
72//! # Sending request to specific node
73//! In some cases you'd want to send a request to a specific node in the cluster, instead of
74//! letting the cluster connection decide by itself to which node it should send the request.
75//! This can happen, for example, if you want to send SCAN commands to each node in the cluster.
76//!
77//! ```rust,no_run
78//! use redis::cluster::ClusterClient;
79//! use redis::{Value, AsyncCommands};
80//! use redis::cluster_routing::{ RoutingInfo, SingleNodeRoutingInfo };
81//!
82//! async fn fetch_an_integer() -> redis::RedisResult<Value> {
83//!     let nodes = vec!["redis://127.0.0.1/"];
84//!     let client = ClusterClient::new(nodes)?;
85//!     let mut connection = client.get_async_connection().await?;
86//!     let routing_info = RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress{
87//!         host: "redis://127.0.0.1".to_string(),
88//!         port: 6378
89//!     });
90//!     connection.route_command(&redis::cmd("PING"), routing_info).await
91//! }
92//! ```
93use std::{
94    collections::HashMap,
95    fmt,
96    future::Future,
97    io, mem,
98    pin::Pin,
99    sync::{Arc, Mutex},
100    task::{self, Poll},
101    time::Duration,
102};
103
104mod request;
105mod routing;
106use crate::{
107    aio::{check_resp3, ConnectionLike, HandleContainer, MultiplexedConnection, Runtime},
108    cluster::{get_connection_info, slot_cmd},
109    cluster_client::ClusterParams,
110    cluster_routing::{
111        MultipleNodeRoutingInfo, Redirect, ResponsePolicy, RoutingInfo, SingleNodeRoutingInfo,
112        Slot, SlotMap,
113    },
114    cluster_topology::parse_slots,
115    cmd,
116    subscription_tracker::SubscriptionTracker,
117    types::closed_connection_error,
118    AsyncConnectionConfig, Cmd, ConnectionInfo, ErrorKind, IntoConnectionInfo, RedisError,
119    RedisFuture, RedisResult, ToRedisArgs, Value,
120};
121
122#[cfg(feature = "cache-aio")]
123use crate::caching::{CacheManager, CacheStatistics};
124use crate::ProtocolVersion;
125use futures_sink::Sink;
126use futures_util::{
127    future::{self, BoxFuture, FutureExt},
128    ready,
129    stream::{self, Stream, StreamExt},
130};
131use log::{debug, trace, warn};
132use rand::{rng, seq::IteratorRandom};
133use request::{CmdArg, PendingRequest, Request, RequestState, Retry};
134use routing::{route_for_pipeline, InternalRoutingInfo, InternalSingleNodeRouting};
135use tokio::sync::{mpsc, oneshot, RwLock};
136
137struct ClientSideState {
138    protocol: ProtocolVersion,
139    _task_handle: HandleContainer,
140    response_timeout: Option<Duration>,
141    runtime: Runtime,
142    #[cfg(feature = "cache-aio")]
143    cache_manager: Option<CacheManager>,
144}
145
146/// This represents an async Redis Cluster connection.
147///
148/// It stores the underlying connections maintained for each node in the cluster,
149/// as well as common parameters for connecting to nodes and executing commands.
150#[derive(Clone)]
151pub struct ClusterConnection<C = MultiplexedConnection> {
152    state: Arc<ClientSideState>,
153    sender: mpsc::Sender<Message<C>>,
154}
155
156impl<C> ClusterConnection<C>
157where
158    C: ConnectionLike + Connect + Clone + Send + Sync + Unpin + 'static,
159{
160    pub(crate) async fn new(
161        initial_nodes: &[ConnectionInfo],
162        cluster_params: ClusterParams,
163    ) -> RedisResult<ClusterConnection<C>> {
164        let protocol = cluster_params.protocol.unwrap_or_default();
165        let response_timeout = cluster_params.response_timeout;
166        #[cfg(feature = "cache-aio")]
167        let cache_manager = cluster_params.cache_manager.clone();
168        let runtime = Runtime::locate();
169        ClusterConnInner::new(initial_nodes, cluster_params)
170            .await
171            .map(|inner| {
172                let (sender, mut receiver) = mpsc::channel::<Message<_>>(100);
173                let stream = async move {
174                    let _ = stream::poll_fn(move |cx| receiver.poll_recv(cx))
175                        .map(Ok)
176                        .forward(inner)
177                        .await;
178                };
179                let _task_handle = HandleContainer::new(runtime.spawn(stream));
180
181                ClusterConnection {
182                    sender,
183                    state: Arc::new(ClientSideState {
184                        protocol,
185                        _task_handle,
186                        response_timeout,
187                        runtime,
188                        #[cfg(feature = "cache-aio")]
189                        cache_manager,
190                    }),
191                }
192            })
193    }
194
195    /// Send a command to the given `routing`, and aggregate the response according to `response_policy`.
196    pub async fn route_command(&mut self, cmd: &Cmd, routing: RoutingInfo) -> RedisResult<Value> {
197        trace!("send_packed_command");
198        let (sender, receiver) = oneshot::channel();
199        let request = async {
200            self.sender
201                .send(Message {
202                    cmd: CmdArg::Cmd {
203                        cmd: Arc::new(cmd.clone()), // TODO Remove this clone?
204                        routing: routing.into(),
205                    },
206                    sender,
207                })
208                .await
209                .map_err(|_| {
210                    RedisError::from(io::Error::new(
211                        io::ErrorKind::BrokenPipe,
212                        "redis_cluster: Unable to send command",
213                    ))
214                })?;
215
216            receiver
217                .await
218                .unwrap_or_else(|_| {
219                    Err(RedisError::from(io::Error::new(
220                        io::ErrorKind::BrokenPipe,
221                        "redis_cluster: Unable to receive command",
222                    )))
223                })
224                .map(|response| match response {
225                    Response::Single(value) => value,
226                    Response::Multiple(_) => unreachable!(),
227                })
228        };
229
230        match self.state.response_timeout {
231            Some(duration) => self.state.runtime.timeout(duration, request).await?,
232            None => request.await,
233        }
234    }
235
236    /// Send commands in `pipeline` to the given `route`. If `route` is [None], it will be sent to a random node.
237    pub async fn route_pipeline<'a>(
238        &'a mut self,
239        pipeline: &'a crate::Pipeline,
240        offset: usize,
241        count: usize,
242        route: SingleNodeRoutingInfo,
243    ) -> RedisResult<Vec<Value>> {
244        let (sender, receiver) = oneshot::channel();
245
246        let request = async {
247            self.sender
248                .send(Message {
249                    cmd: CmdArg::Pipeline {
250                        pipeline: Arc::new(pipeline.clone()), // TODO Remove this clone?
251                        offset,
252                        count,
253                        route: route.into(),
254                    },
255                    sender,
256                })
257                .await
258                .map_err(|_| closed_connection_error())?;
259            receiver
260                .await
261                .unwrap_or_else(|_| Err(closed_connection_error()))
262                .map(|response| match response {
263                    Response::Multiple(values) => values,
264                    Response::Single(_) => unreachable!(),
265                })
266        };
267
268        match self.state.response_timeout {
269            Some(duration) => self.state.runtime.timeout(duration, request).await?,
270            None => request.await,
271        }
272    }
273
274    /// Subscribes to a new channel(s).
275    ///
276    /// Updates from the sender will be sent on the push sender that was passed to the manager.
277    /// If the manager was configured without a push sender, the connection won't be able to pass messages back to the user.
278    ///
279    /// This method is only available when the connection is using RESP3 protocol, and will return an error otherwise.
280    /// It should be noted that the subscription will be automatically resubscribed after disconnections, so the user might
281    /// receive additional pushes with [crate::PushKind::Subscribe], later after the subscription completed.
282    pub async fn subscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> {
283        check_resp3!(self.state.protocol);
284        let mut cmd = cmd("SUBSCRIBE");
285        cmd.arg(channel_name);
286        cmd.exec_async(self).await?;
287        Ok(())
288    }
289
290    /// Unsubscribes from channel(s).
291    ///
292    /// This method is only available when the connection is using RESP3 protocol, and will return an error otherwise.
293    pub async fn unsubscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> {
294        check_resp3!(self.state.protocol);
295        let mut cmd = cmd("UNSUBSCRIBE");
296        cmd.arg(channel_name);
297        cmd.exec_async(self).await?;
298        Ok(())
299    }
300
301    /// Subscribes to new channel(s) with pattern(s).
302    ///
303    /// Updates from the sender will be sent on the push sender that was passed to the manager.
304    /// If the manager was configured without a push sender, the manager won't be able to pass messages back to the user.
305    ///
306    /// This method is only available when the connection is using RESP3 protocol, and will return an error otherwise.
307    /// It should be noted that the subscription will be automatically resubscribed after disconnections, so the user might
308    /// receive additional pushes with [crate::PushKind::PSubscribe], later after the subscription completed.
309    pub async fn psubscribe(&mut self, channel_pattern: impl ToRedisArgs) -> RedisResult<()> {
310        check_resp3!(self.state.protocol);
311        let mut cmd = cmd("PSUBSCRIBE");
312        cmd.arg(channel_pattern);
313        cmd.exec_async(self).await?;
314        Ok(())
315    }
316
317    /// Unsubscribes from channel pattern(s).
318    ///
319    /// This method is only available when the connection is using RESP3 protocol, and will return an error otherwise.
320    pub async fn punsubscribe(&mut self, channel_pattern: impl ToRedisArgs) -> RedisResult<()> {
321        check_resp3!(self.state.protocol);
322        let mut cmd = cmd("PUNSUBSCRIBE");
323        cmd.arg(channel_pattern);
324        cmd.exec_async(self).await?;
325        Ok(())
326    }
327
328    /// Subscribes to a new sharded channel(s).
329    ///
330    /// Updates from the sender will be sent on the push sender that was passed to the manager.
331    /// If the manager was configured without a push sender, the manager won't be able to pass messages back to the user.
332    ///
333    /// This method is only available when the connection is using RESP3 protocol, and will return an error otherwise.
334    /// It should be noted that the subscription will be automatically resubscribed after disconnections, so the user might
335    /// receive additional pushes with [crate::PushKind::SSubscribe], later after the subscription completed.
336    pub async fn ssubscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> {
337        check_resp3!(self.state.protocol);
338        let mut cmd = cmd("SSUBSCRIBE");
339        cmd.arg(channel_name);
340        cmd.exec_async(self).await?;
341        Ok(())
342    }
343
344    /// Unsubscribes from sharded channel(s).
345    ///
346    /// This method is only available when the connection is using RESP3 protocol, and will return an error otherwise.
347    pub async fn sunsubscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> {
348        check_resp3!(self.state.protocol);
349        let mut cmd = cmd("SUNSUBSCRIBE");
350        cmd.arg(channel_name);
351        cmd.exec_async(self).await?;
352        Ok(())
353    }
354    /// Gets [`CacheStatistics`] for cluster connection if caching is enabled.
355    #[cfg(feature = "cache-aio")]
356    #[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
357    pub fn get_cache_statistics(&self) -> Option<CacheStatistics> {
358        self.state.cache_manager.as_ref().map(|cm| cm.statistics())
359    }
360}
361
362type ConnectionMap<C> = HashMap<String, C>;
363
364/// This is the internal representation of an async Redis Cluster connection. It stores the
365/// underlying connections maintained for each node in the cluster, as well
366/// as common parameters for connecting to nodes and executing commands.
367struct InnerCore<C> {
368    conn_lock: RwLock<(ConnectionMap<C>, SlotMap)>,
369    cluster_params: ClusterParams,
370    pending_requests: Mutex<Vec<PendingRequest<C>>>,
371    initial_nodes: Vec<ConnectionInfo>,
372    subscription_tracker: Option<Mutex<SubscriptionTracker>>,
373}
374
375type Core<C> = Arc<InnerCore<C>>;
376
377/// This is the sink for requests sent by the user.
378/// It holds the stream of requests which are "in flight", E.G. on their way to the server,
379/// and the inner representation of the connection.
380struct ClusterConnInner<C> {
381    inner: Core<C>,
382    state: ConnectionState,
383    #[allow(clippy::complexity)]
384    in_flight_requests: stream::FuturesUnordered<Pin<Box<Request<C>>>>,
385    refresh_error: Option<RedisError>,
386}
387
388fn boxed_sleep(duration: Duration) -> BoxFuture<'static, ()> {
389    Box::pin(Runtime::locate_and_sleep(duration))
390}
391
392#[derive(Debug, PartialEq)]
393pub(crate) enum Response {
394    Single(Value),
395    Multiple(Vec<Value>),
396}
397
398enum OperationTarget {
399    Node { address: String },
400    NotFound,
401    FanOut,
402}
403type OperationResult = Result<Response, (OperationTarget, RedisError)>;
404
405impl From<String> for OperationTarget {
406    fn from(address: String) -> Self {
407        OperationTarget::Node { address }
408    }
409}
410
411struct Message<C> {
412    cmd: CmdArg<C>,
413    sender: oneshot::Sender<RedisResult<Response>>,
414}
415
416enum RecoverFuture {
417    RecoverSlots(BoxFuture<'static, RedisResult<()>>),
418    Reconnect(BoxFuture<'static, ()>),
419}
420
421enum ConnectionState {
422    PollComplete,
423    Recover(RecoverFuture),
424}
425
426impl fmt::Debug for ConnectionState {
427    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
428        write!(
429            f,
430            "{}",
431            match self {
432                ConnectionState::PollComplete => "PollComplete",
433                ConnectionState::Recover(_) => "Recover",
434            }
435        )
436    }
437}
438
439impl<C> ClusterConnInner<C>
440where
441    C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
442{
443    async fn new(
444        initial_nodes: &[ConnectionInfo],
445        cluster_params: ClusterParams,
446    ) -> RedisResult<Self> {
447        let connections = Self::create_initial_connections(initial_nodes, &cluster_params).await?;
448        let subscription_tracker = if cluster_params.async_push_sender.is_some() {
449            Some(Mutex::new(SubscriptionTracker::default()))
450        } else {
451            None
452        };
453        let inner = Arc::new(InnerCore {
454            conn_lock: RwLock::new((connections, SlotMap::new(cluster_params.read_from_replicas))),
455            cluster_params,
456            pending_requests: Mutex::new(Vec::new()),
457            initial_nodes: initial_nodes.to_vec(),
458            subscription_tracker,
459        });
460        let connection = ClusterConnInner {
461            inner,
462            in_flight_requests: Default::default(),
463            refresh_error: None,
464            state: ConnectionState::PollComplete,
465        };
466        Self::refresh_slots(connection.inner.clone()).await?;
467        Ok(connection)
468    }
469
470    async fn create_initial_connections(
471        initial_nodes: &[ConnectionInfo],
472        params: &ClusterParams,
473    ) -> RedisResult<ConnectionMap<C>> {
474        let (connections, error) = stream::iter(initial_nodes.iter().cloned())
475            .map(|info| {
476                let params = params.clone();
477                async move {
478                    let addr = info.addr.to_string();
479                    let result = connect_and_check(&addr, params).await;
480                    match result {
481                        Ok(conn) => Ok((addr, conn)),
482                        Err(e) => {
483                            debug!("Failed to connect to initial node: {e:?}");
484                            Err(e)
485                        }
486                    }
487                }
488            })
489            .buffer_unordered(initial_nodes.len())
490            .fold(
491                (ConnectionMap::<C>::with_capacity(initial_nodes.len()), None),
492                |(mut connections, mut error), result| async move {
493                    match result {
494                        Ok((addr, conn)) => {
495                            connections.insert(addr, conn);
496                        }
497                        Err(err) => {
498                            // Store at least one error to use as detail in the connection error if
499                            // all connections fail.
500                            error = Some(err);
501                        }
502                    }
503                    (connections, error)
504                },
505            )
506            .await;
507        if connections.is_empty() {
508            if let Some(err) = error {
509                return Err(RedisError::from((
510                    ErrorKind::IoError,
511                    "Failed to create initial connections",
512                    err.to_string(),
513                )));
514            } else {
515                return Err(RedisError::from((
516                    ErrorKind::IoError,
517                    "Failed to create initial connections",
518                )));
519            }
520        }
521        Ok(connections)
522    }
523
524    fn resubscribe(&self) {
525        let Some(subscription_tracker) = self.inner.subscription_tracker.as_ref() else {
526            return;
527        };
528
529        let subscription_pipe = subscription_tracker
530            .lock()
531            .unwrap()
532            .get_subscription_pipeline();
533
534        // we send request per cmd, instead of sending the pipe together, in order to send each command to the relevant node, instead of all together to a single node.
535        let requests = subscription_pipe.cmd_iter().map(|cmd| {
536            let routing = RoutingInfo::for_routable(cmd)
537                .unwrap_or(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
538                .into();
539            PendingRequest {
540                retry: 0,
541                sender: request::ResultExpectation::Internal,
542                cmd: CmdArg::Cmd {
543                    cmd: Arc::new(cmd.clone()),
544                    routing,
545                },
546            }
547        });
548        self.inner.pending_requests.lock().unwrap().extend(requests);
549    }
550
551    fn reconnect_to_initial_nodes(&mut self) -> impl Future<Output = ()> {
552        debug!("Received request to reconnect to initial nodes");
553        let inner = self.inner.clone();
554        async move {
555            let connection_map =
556                match Self::create_initial_connections(&inner.initial_nodes, &inner.cluster_params)
557                    .await
558                {
559                    Ok(map) => map,
560                    Err(err) => {
561                        warn!("Can't reconnect to initial nodes: `{err}`");
562                        return;
563                    }
564                };
565            let mut write_lock = inner.conn_lock.write().await;
566            *write_lock = (
567                connection_map,
568                SlotMap::new(inner.cluster_params.read_from_replicas),
569            );
570            drop(write_lock);
571            if let Err(err) = Self::refresh_slots(inner.clone()).await {
572                warn!("Can't refresh slots with initial nodes: `{err}`");
573            };
574        }
575    }
576
577    fn refresh_connections(&mut self, addrs: Vec<String>) -> impl Future<Output = ()> {
578        let inner = self.inner.clone();
579        async move {
580            let mut write_guard = inner.conn_lock.write().await;
581
582            Self::refresh_connections_locked(&inner, &mut write_guard.0, addrs).await;
583        }
584    }
585
586    // Query a node to discover slot-> master mappings.
587    async fn refresh_slots(inner: Core<C>) -> RedisResult<()> {
588        let mut write_guard = inner.conn_lock.write().await;
589        let (connections, slots) = &mut *write_guard;
590
591        let mut result = Ok(());
592        for (addr, conn) in &mut *connections {
593            result = async {
594                let value = conn
595                    .req_packed_command(&slot_cmd())
596                    .await
597                    .and_then(|value| value.extract_error())?;
598                let v: Vec<Slot> = parse_slots(
599                    value,
600                    inner.cluster_params.tls,
601                    addr.rsplit_once(':').unwrap().0,
602                )?;
603                Self::build_slot_map(slots, v)
604            }
605            .await;
606            if result.is_ok() {
607                break;
608            }
609        }
610        result?;
611
612        let mut nodes = slots.values().flatten().cloned().collect::<Vec<_>>();
613        nodes.sort_unstable();
614        nodes.dedup();
615        Self::refresh_connections_locked(&inner, connections, nodes).await;
616
617        Ok(())
618    }
619
620    async fn refresh_connections_locked(
621        inner: &Core<C>,
622        connections: &mut ConnectionMap<C>,
623        nodes: Vec<String>,
624    ) {
625        let nodes_len = nodes.len();
626
627        let addresses_and_connections_iter = nodes.into_iter().map(|addr| {
628            let value = connections.remove(&addr);
629            (addr, value)
630        });
631
632        let inner = &inner;
633        *connections = stream::iter(addresses_and_connections_iter)
634            .map(|(addr, connection)| async move {
635                (
636                    addr.clone(),
637                    Self::get_or_create_conn(&addr, connection, &inner.cluster_params).await,
638                )
639            })
640            .buffer_unordered(nodes_len.max(8))
641            .fold(
642                HashMap::with_capacity(nodes_len),
643                |mut connections, (addr, result)| async move {
644                    if let Ok(conn) = result {
645                        connections.insert(addr, conn);
646                    }
647                    connections
648                },
649            )
650            .await;
651    }
652
653    fn build_slot_map(slot_map: &mut SlotMap, slots_data: Vec<Slot>) -> RedisResult<()> {
654        slot_map.clear();
655        slot_map.fill_slots(slots_data);
656        trace!("{slot_map:?}");
657        Ok(())
658    }
659
660    async fn aggregate_results(
661        receivers: Vec<(String, oneshot::Receiver<RedisResult<Response>>)>,
662        routing: &MultipleNodeRoutingInfo,
663        response_policy: Option<ResponsePolicy>,
664    ) -> RedisResult<Value> {
665        if receivers.is_empty() {
666            return Err((
667                ErrorKind::ClusterConnectionNotFound,
668                "No nodes found for multi-node operation",
669            )
670                .into());
671        }
672
673        let extract_result = |response| match response {
674            Response::Single(value) => value,
675            Response::Multiple(_) => unreachable!(),
676        };
677
678        let convert_result = |res: Result<RedisResult<Response>, _>| {
679            res.map_err(|_| RedisError::from((ErrorKind::ResponseError, "request wasn't handled due to internal failure"))) // this happens only if the result sender is dropped before usage.
680            .and_then(|res| res.map(extract_result))
681        };
682
683        let get_receiver = |(_, receiver): (_, oneshot::Receiver<RedisResult<Response>>)| async {
684            convert_result(receiver.await)
685        };
686
687        // TODO - once Value::Error will be merged, these will need to be updated to handle this new value.
688        match response_policy {
689            Some(ResponsePolicy::AllSucceeded) => {
690                future::try_join_all(receivers.into_iter().map(get_receiver))
691                    .await
692                    .and_then(|mut results| {
693                        results.pop().ok_or(
694                            (
695                                ErrorKind::ClusterConnectionNotFound,
696                                "No results received for multi-node operation",
697                            )
698                                .into(),
699                        )
700                    })
701            }
702            Some(ResponsePolicy::OneSucceeded) => future::select_ok(
703                receivers
704                    .into_iter()
705                    .map(|tuple| Box::pin(get_receiver(tuple))),
706            )
707            .await
708            .map(|(result, _)| result),
709            Some(ResponsePolicy::OneSucceededNonEmpty) => {
710                future::select_ok(receivers.into_iter().map(|(_, receiver)| {
711                    Box::pin(async move {
712                        let result = convert_result(receiver.await)?;
713                        match result {
714                            Value::Nil => Err((ErrorKind::ResponseError, "no value found").into()),
715                            _ => Ok(result),
716                        }
717                    })
718                }))
719                .await
720                .map(|(result, _)| result)
721            }
722            Some(ResponsePolicy::Aggregate(op)) => {
723                future::try_join_all(receivers.into_iter().map(get_receiver))
724                    .await
725                    .and_then(|results| crate::cluster_routing::aggregate(results, op))
726            }
727            Some(ResponsePolicy::AggregateLogical(op)) => {
728                future::try_join_all(receivers.into_iter().map(get_receiver))
729                    .await
730                    .and_then(|results| crate::cluster_routing::logical_aggregate(results, op))
731            }
732            Some(ResponsePolicy::CombineArrays) => {
733                future::try_join_all(receivers.into_iter().map(get_receiver))
734                    .await
735                    .and_then(|results| match routing {
736                        MultipleNodeRoutingInfo::MultiSlot(vec) => {
737                            crate::cluster_routing::combine_and_sort_array_results(
738                                results,
739                                vec.iter().map(|(_, indices)| indices),
740                            )
741                        }
742                        _ => crate::cluster_routing::combine_array_results(results),
743                    })
744            }
745            Some(ResponsePolicy::Special) | None => {
746                // This is our assumption - if there's no coherent way to aggregate the responses, we just map each response to the sender, and pass it to the user.
747
748                // TODO - once Value::Error is merged, we can use join_all and report separate errors and also pass successes.
749                future::try_join_all(receivers.into_iter().map(|(addr, receiver)| async move {
750                    let result = convert_result(receiver.await)?;
751                    Ok((Value::BulkString(addr.into_bytes()), result))
752                }))
753                .await
754                .map(Value::Map)
755            }
756        }
757    }
758
759    async fn execute_on_multiple_nodes<'a>(
760        cmd: &'a Arc<Cmd>,
761        routing: &'a MultipleNodeRoutingInfo,
762        core: Core<C>,
763        response_policy: Option<ResponsePolicy>,
764    ) -> OperationResult {
765        let read_guard = core.conn_lock.read().await;
766        if read_guard.0.is_empty() {
767            return OperationResult::Err((
768                OperationTarget::FanOut,
769                (
770                    ErrorKind::ClusterConnectionNotFound,
771                    "No connections found for multi-node operation",
772                )
773                    .into(),
774            ));
775        }
776        let (receivers, requests): (Vec<_>, Vec<_>) = {
777            let to_request = |(addr, cmd): (&str, Arc<Cmd>)| {
778                read_guard.0.get(addr).cloned().map(|conn| {
779                    let (sender, receiver) = oneshot::channel();
780                    let addr = addr.to_string();
781                    (
782                        (addr.clone(), receiver),
783                        PendingRequest {
784                            retry: 0,
785                            sender: request::ResultExpectation::External(sender),
786                            cmd: CmdArg::Cmd {
787                                cmd,
788                                routing: InternalSingleNodeRouting::Connection {
789                                    identifier: addr,
790                                    conn,
791                                }
792                                .into(),
793                            },
794                        },
795                    )
796                })
797            };
798            let slot_map = &read_guard.1;
799
800            // TODO - these filter_map calls mean that we ignore nodes that are missing. Should we report an error in such cases?
801            // since some of the operators drop other requests, mapping to errors here might mean that no request is sent.
802            match routing {
803                MultipleNodeRoutingInfo::AllNodes => slot_map
804                    .addresses_for_all_nodes()
805                    .into_iter()
806                    .filter_map(|addr| to_request((addr, cmd.clone())))
807                    .unzip(),
808                MultipleNodeRoutingInfo::AllMasters => slot_map
809                    .addresses_for_all_primaries()
810                    .into_iter()
811                    .filter_map(|addr| to_request((addr, cmd.clone())))
812                    .unzip(),
813                MultipleNodeRoutingInfo::MultiSlot(routes) => slot_map
814                    .addresses_for_multi_slot(routes)
815                    .enumerate()
816                    .filter_map(|(index, addr_opt)| {
817                        addr_opt.and_then(|addr| {
818                            let (_, indices) = routes.get(index).unwrap();
819                            let cmd =
820                                Arc::new(crate::cluster_routing::command_for_multi_slot_indices(
821                                    cmd.as_ref(),
822                                    indices.iter(),
823                                ));
824                            to_request((addr, cmd))
825                        })
826                    })
827                    .unzip(),
828            }
829        };
830        drop(read_guard);
831        core.pending_requests.lock().unwrap().extend(requests);
832
833        Self::aggregate_results(receivers, routing, response_policy)
834            .await
835            .map(Response::Single)
836            .map_err(|err| (OperationTarget::FanOut, err))
837    }
838
839    async fn try_cmd_request(
840        cmd: Arc<Cmd>,
841        routing: InternalRoutingInfo<C>,
842        core: Core<C>,
843    ) -> OperationResult {
844        let route = match routing {
845            InternalRoutingInfo::SingleNode(single_node_routing) => single_node_routing,
846            InternalRoutingInfo::MultiNode((multi_node_routing, response_policy)) => {
847                return Self::execute_on_multiple_nodes(
848                    &cmd,
849                    &multi_node_routing,
850                    core,
851                    response_policy,
852                )
853                .await;
854            }
855        };
856
857        match Self::get_connection(route, core).await {
858            Ok((addr, mut conn)) => conn
859                .req_packed_command(&cmd)
860                .await
861                .and_then(|value| value.extract_error())
862                .map(Response::Single)
863                .map_err(|err| (addr.into(), err)),
864            Err(err) => Err((OperationTarget::NotFound, err)),
865        }
866    }
867
868    async fn try_pipeline_request(
869        pipeline: Arc<crate::Pipeline>,
870        offset: usize,
871        count: usize,
872        conn: impl Future<Output = RedisResult<(String, C)>>,
873    ) -> OperationResult {
874        match conn.await {
875            Ok((addr, mut conn)) => conn
876                .req_packed_commands(&pipeline, offset, count)
877                .await
878                .and_then(Value::extract_error_vec)
879                .map(Response::Multiple)
880                .map_err(|err| (OperationTarget::Node { address: addr }, err)),
881            Err(err) => Err((OperationTarget::NotFound, err)),
882        }
883    }
884
885    async fn try_request(cmd: CmdArg<C>, core: Core<C>) -> OperationResult {
886        match cmd {
887            CmdArg::Cmd { cmd, routing } => Self::try_cmd_request(cmd, routing, core).await,
888            CmdArg::Pipeline {
889                pipeline,
890                offset,
891                count,
892                route,
893            } => {
894                Self::try_pipeline_request(
895                    pipeline,
896                    offset,
897                    count,
898                    Self::get_connection(route, core),
899                )
900                .await
901            }
902        }
903    }
904
905    async fn get_connection(
906        route: InternalSingleNodeRouting<C>,
907        core: Core<C>,
908    ) -> RedisResult<(String, C)> {
909        let read_guard = core.conn_lock.read().await;
910
911        let conn = match route {
912            InternalSingleNodeRouting::Random => None,
913            InternalSingleNodeRouting::SpecificNode(route) => read_guard
914                .1
915                .slot_addr_for_route(&route)
916                .map(|addr| addr.to_string()),
917            InternalSingleNodeRouting::Connection { identifier, conn } => {
918                return Ok((identifier, conn));
919            }
920            InternalSingleNodeRouting::Redirect { redirect, .. } => {
921                drop(read_guard);
922                // redirected requests shouldn't use a random connection, so they have a separate codepath.
923                return Self::get_redirected_connection(redirect, core).await;
924            }
925            InternalSingleNodeRouting::ByAddress(address) => {
926                if let Some(conn) = read_guard.0.get(&address).cloned() {
927                    return Ok((address, conn));
928                } else {
929                    return Err((
930                        ErrorKind::ClientError,
931                        "Requested connection not found",
932                        address,
933                    )
934                        .into());
935                }
936            }
937        }
938        .map(|addr| {
939            let conn = read_guard.0.get(&addr).cloned();
940            (addr, conn)
941        });
942        drop(read_guard);
943
944        let addr_conn_option = match conn {
945            Some((addr, Some(conn))) => Some((addr, conn)),
946            Some((addr, None)) => connect_check_and_add(core.clone(), addr.clone())
947                .await
948                .ok()
949                .map(|conn| (addr, conn)),
950            None => None,
951        };
952
953        let (addr, conn) = match addr_conn_option {
954            Some(tuple) => tuple,
955            None => {
956                let read_guard = core.conn_lock.read().await;
957                if let Some((random_addr, random_conn)) = get_random_connection(&read_guard.0) {
958                    drop(read_guard);
959                    (random_addr, random_conn)
960                } else {
961                    return Err(
962                        (ErrorKind::ClusterConnectionNotFound, "No connections found").into(),
963                    );
964                }
965            }
966        };
967
968        Ok((addr, conn))
969    }
970
971    async fn get_redirected_connection(
972        redirect: Redirect,
973        core: Core<C>,
974    ) -> RedisResult<(String, C)> {
975        let asking = matches!(redirect, Redirect::Ask(_));
976        let addr = match redirect {
977            Redirect::Moved(addr) => addr,
978            Redirect::Ask(addr) => addr,
979        };
980        let read_guard = core.conn_lock.read().await;
981        let conn = read_guard.0.get(&addr).cloned();
982        drop(read_guard);
983        let mut conn = match conn {
984            Some(conn) => conn,
985            None => connect_check_and_add(core.clone(), addr.clone()).await?,
986        };
987        if asking {
988            let _ = conn
989                .req_packed_command(&crate::cmd::cmd("ASKING"))
990                .await
991                .and_then(|value| value.extract_error());
992        }
993
994        Ok((addr, conn))
995    }
996
997    fn poll_recover(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), RedisError>> {
998        let recover_future = match &mut self.state {
999            ConnectionState::PollComplete => return Poll::Ready(Ok(())),
1000            ConnectionState::Recover(future) => future,
1001        };
1002        let res = match recover_future {
1003            RecoverFuture::RecoverSlots(ref mut future) => match ready!(future.as_mut().poll(cx)) {
1004                Ok(_) => {
1005                    trace!("Recovered!");
1006                    self.state = ConnectionState::PollComplete;
1007                    Ok(())
1008                }
1009                Err(err) => {
1010                    trace!("Recover slots failed!");
1011                    *future = Box::pin(Self::refresh_slots(self.inner.clone()));
1012                    Err(err)
1013                }
1014            },
1015            RecoverFuture::Reconnect(ref mut future) => {
1016                ready!(future.as_mut().poll(cx));
1017                trace!("Reconnected connections");
1018                self.state = ConnectionState::PollComplete;
1019                Ok(())
1020            }
1021        };
1022        if res.is_ok() {
1023            self.resubscribe();
1024        }
1025        Poll::Ready(res)
1026    }
1027
1028    fn poll_complete(&mut self, cx: &mut task::Context<'_>) -> Poll<PollFlushAction> {
1029        let mut poll_flush_action = PollFlushAction::None;
1030
1031        let mut pending_requests_guard = self.inner.pending_requests.lock().unwrap();
1032        if !pending_requests_guard.is_empty() {
1033            let mut pending_requests = mem::take(&mut *pending_requests_guard);
1034            for request in pending_requests.drain(..) {
1035                // Drop the request if noone is waiting for a response to free up resources for
1036                // requests callers care about (load shedding). It will be ambiguous whether the
1037                // request actually goes through regardless.
1038                if request.sender.is_closed() {
1039                    continue;
1040                }
1041
1042                let future = Self::try_request(request.cmd.clone(), self.inner.clone()).boxed();
1043                self.in_flight_requests.push(Box::pin(Request {
1044                    retry_params: self.inner.cluster_params.retry_params.clone(),
1045                    request: Some(request),
1046                    future: RequestState::Future { future },
1047                }));
1048            }
1049            *pending_requests_guard = pending_requests;
1050        }
1051        drop(pending_requests_guard);
1052
1053        loop {
1054            let (request_handling, next) =
1055                match Pin::new(&mut self.in_flight_requests).poll_next(cx) {
1056                    Poll::Ready(Some(result)) => result,
1057                    Poll::Ready(None) | Poll::Pending => break,
1058                };
1059            match request_handling {
1060                Some(Retry::MoveToPending { request }) => {
1061                    self.inner.pending_requests.lock().unwrap().push(request);
1062                }
1063                Some(Retry::Immediately { request }) => {
1064                    let future = Self::try_request(request.cmd.clone(), self.inner.clone());
1065                    self.in_flight_requests.push(Box::pin(Request {
1066                        retry_params: self.inner.cluster_params.retry_params.clone(),
1067                        request: Some(request),
1068                        future: RequestState::Future {
1069                            future: Box::pin(future),
1070                        },
1071                    }));
1072                }
1073                Some(Retry::AfterSleep {
1074                    request,
1075                    sleep_duration,
1076                }) => {
1077                    let future = RequestState::Sleep {
1078                        sleep: boxed_sleep(sleep_duration),
1079                    };
1080                    self.in_flight_requests.push(Box::pin(Request {
1081                        retry_params: self.inner.cluster_params.retry_params.clone(),
1082                        request: Some(request),
1083                        future,
1084                    }));
1085                }
1086                None => {}
1087            };
1088            poll_flush_action = poll_flush_action.change_state(next);
1089        }
1090
1091        if !matches!(poll_flush_action, PollFlushAction::None) || self.in_flight_requests.is_empty()
1092        {
1093            Poll::Ready(poll_flush_action)
1094        } else {
1095            Poll::Pending
1096        }
1097    }
1098
1099    fn send_refresh_error(&mut self) {
1100        if self.refresh_error.is_some() {
1101            if let Some(mut request) = Pin::new(&mut self.in_flight_requests)
1102                .iter_pin_mut()
1103                .find(|request| request.request.is_some())
1104            {
1105                (*request)
1106                    .as_mut()
1107                    .respond(Err(self.refresh_error.take().unwrap()));
1108            } else {
1109                // Use a separate binding for this to release the lock guard before calling send.
1110                let maybe_request = self.inner.pending_requests.lock().unwrap().pop();
1111                if let Some(request) = maybe_request {
1112                    request.sender.send(Err(self.refresh_error.take().unwrap()));
1113                }
1114            }
1115        }
1116    }
1117
1118    async fn get_or_create_conn(
1119        addr: &str,
1120        conn_option: Option<C>,
1121        params: &ClusterParams,
1122    ) -> RedisResult<C> {
1123        if let Some(mut conn) = conn_option {
1124            match check_connection(&mut conn).await {
1125                Ok(_) => Ok(conn),
1126                Err(_) => connect_and_check(addr, params.clone()).await,
1127            }
1128        } else {
1129            connect_and_check(addr, params.clone()).await
1130        }
1131    }
1132}
1133
1134#[derive(Debug, PartialEq)]
1135enum PollFlushAction {
1136    None,
1137    RebuildSlots,
1138    Reconnect(Vec<String>),
1139    ReconnectFromInitialConnections,
1140}
1141
1142impl PollFlushAction {
1143    fn change_state(self, next_state: PollFlushAction) -> PollFlushAction {
1144        match (self, next_state) {
1145            (PollFlushAction::None, next_state) => next_state,
1146            (next_state, PollFlushAction::None) => next_state,
1147            (PollFlushAction::ReconnectFromInitialConnections, _)
1148            | (_, PollFlushAction::ReconnectFromInitialConnections) => {
1149                PollFlushAction::ReconnectFromInitialConnections
1150            }
1151
1152            (PollFlushAction::RebuildSlots, _) | (_, PollFlushAction::RebuildSlots) => {
1153                PollFlushAction::RebuildSlots
1154            }
1155
1156            (PollFlushAction::Reconnect(mut addrs), PollFlushAction::Reconnect(new_addrs)) => {
1157                addrs.extend(new_addrs);
1158                Self::Reconnect(addrs)
1159            }
1160        }
1161    }
1162}
1163
1164impl<C> Sink<Message<C>> for ClusterConnInner<C>
1165where
1166    C: ConnectionLike + Connect + Clone + Send + Sync + Unpin + 'static,
1167{
1168    type Error = ();
1169
1170    fn poll_ready(self: Pin<&mut Self>, _cx: &mut task::Context) -> Poll<Result<(), Self::Error>> {
1171        Poll::Ready(Ok(()))
1172    }
1173
1174    fn start_send(self: Pin<&mut Self>, msg: Message<C>) -> Result<(), Self::Error> {
1175        trace!("start_send");
1176        let Message { cmd, sender } = msg;
1177
1178        if let Some(tracker) = &self.inner.subscription_tracker {
1179            // TODO - benchmark whether checking whether the command is a subscription outside of the mutex is more performant.
1180            let mut tracker = tracker.lock().unwrap();
1181            match &cmd {
1182                CmdArg::Cmd { cmd, .. } => tracker.update_with_cmd(cmd.as_ref()),
1183                CmdArg::Pipeline { pipeline, .. } => {
1184                    tracker.update_with_pipeline(pipeline.as_ref())
1185                }
1186            }
1187        };
1188
1189        self.inner
1190            .pending_requests
1191            .lock()
1192            .unwrap()
1193            .push(PendingRequest {
1194                retry: 0,
1195                sender: request::ResultExpectation::External(sender),
1196                cmd,
1197            });
1198        Ok(())
1199    }
1200
1201    fn poll_flush(
1202        mut self: Pin<&mut Self>,
1203        cx: &mut task::Context,
1204    ) -> Poll<Result<(), Self::Error>> {
1205        trace!("poll_flush: {:?}", self.state);
1206        loop {
1207            self.send_refresh_error();
1208
1209            if let Err(err) = ready!(self.as_mut().poll_recover(cx)) {
1210                // We failed to reconnect, while we will try again we will report the
1211                // error if we can to avoid getting trapped in an infinite loop of
1212                // trying to reconnect
1213                self.refresh_error = Some(err);
1214
1215                // Give other tasks a chance to progress before we try to recover
1216                // again. Since the future may not have registered a wake up we do so
1217                // now so the task is not forgotten
1218                cx.waker().wake_by_ref();
1219                return Poll::Pending;
1220            }
1221
1222            match ready!(self.poll_complete(cx)) {
1223                PollFlushAction::None => return Poll::Ready(Ok(())),
1224                PollFlushAction::RebuildSlots => {
1225                    self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
1226                        Self::refresh_slots(self.inner.clone()),
1227                    )));
1228                }
1229                PollFlushAction::Reconnect(addrs) => {
1230                    self.state = ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
1231                        self.refresh_connections(addrs),
1232                    )));
1233                }
1234                PollFlushAction::ReconnectFromInitialConnections => {
1235                    self.state = ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
1236                        self.reconnect_to_initial_nodes(),
1237                    )));
1238                }
1239            }
1240        }
1241    }
1242
1243    fn poll_close(
1244        mut self: Pin<&mut Self>,
1245        cx: &mut task::Context,
1246    ) -> Poll<Result<(), Self::Error>> {
1247        // Try to drive any in flight requests to completion
1248        match self.poll_complete(cx) {
1249            Poll::Ready(PollFlushAction::None) => (),
1250            Poll::Ready(_) => Err(())?,
1251            Poll::Pending => (),
1252        };
1253        // If we no longer have any requests in flight we are done (skips any reconnection
1254        // attempts)
1255        if self.in_flight_requests.is_empty() {
1256            return Poll::Ready(Ok(()));
1257        }
1258
1259        self.poll_flush(cx)
1260    }
1261}
1262
1263impl<C> ConnectionLike for ClusterConnection<C>
1264where
1265    C: ConnectionLike + Send + Clone + Unpin + Sync + Connect + 'static,
1266{
1267    fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
1268        let routing = RoutingInfo::for_routable(cmd)
1269            .unwrap_or(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random));
1270        self.route_command(cmd, routing).boxed()
1271    }
1272
1273    fn req_packed_commands<'a>(
1274        &'a mut self,
1275        pipeline: &'a crate::Pipeline,
1276        offset: usize,
1277        count: usize,
1278    ) -> RedisFuture<'a, Vec<Value>> {
1279        async move {
1280            let route = route_for_pipeline(pipeline)?;
1281            self.route_pipeline(pipeline, offset, count, route.into())
1282                .await
1283        }
1284        .boxed()
1285    }
1286
1287    fn get_db(&self) -> i64 {
1288        0
1289    }
1290}
1291/// Implements the process of connecting to a Redis server
1292/// and obtaining a connection handle.
1293pub trait Connect: Sized {
1294    /// Connect to a node, returning handle for command execution.
1295    fn connect_with_config<'a, T>(info: T, config: AsyncConnectionConfig) -> RedisFuture<'a, Self>
1296    where
1297        T: IntoConnectionInfo + Send + 'a;
1298}
1299
1300impl Connect for MultiplexedConnection {
1301    fn connect_with_config<'a, T>(info: T, config: AsyncConnectionConfig) -> RedisFuture<'a, Self>
1302    where
1303        T: IntoConnectionInfo + Send + 'a,
1304    {
1305        async move {
1306            let connection_info = info.into_connection_info()?;
1307            let client = crate::Client::open(connection_info)?;
1308            client
1309                .get_multiplexed_async_connection_with_config(&config)
1310                .await
1311        }
1312        .boxed()
1313    }
1314}
1315
1316async fn connect_check_and_add<C>(core: Core<C>, addr: String) -> RedisResult<C>
1317where
1318    C: ConnectionLike + Connect + Send + Clone + 'static,
1319{
1320    match connect_and_check::<C>(&addr, core.cluster_params.clone()).await {
1321        Ok(conn) => {
1322            let conn_clone = conn.clone();
1323            core.conn_lock.write().await.0.insert(addr, conn_clone);
1324            Ok(conn)
1325        }
1326        Err(err) => Err(err),
1327    }
1328}
1329
1330async fn connect_and_check<C>(node: &str, params: ClusterParams) -> RedisResult<C>
1331where
1332    C: ConnectionLike + Connect + Send + 'static,
1333{
1334    let read_from_replicas = params.read_from_replicas;
1335    let connection_timeout = params.connection_timeout;
1336    let response_timeout = params.response_timeout;
1337    let push_sender = params.async_push_sender.clone();
1338    let tcp_settings = params.tcp_settings.clone();
1339    let dns_resolver = params.async_dns_resolver.clone();
1340    #[cfg(feature = "cache-aio")]
1341    let cache_manager = params.cache_manager.clone();
1342    let info = get_connection_info(node, params)?;
1343    let mut config = AsyncConnectionConfig::default()
1344        .set_connection_timeout(connection_timeout)
1345        .set_tcp_settings(tcp_settings);
1346    if let Some(response_timeout) = response_timeout {
1347        config = config.set_response_timeout(response_timeout);
1348    };
1349    if let Some(push_sender) = push_sender {
1350        config = config.set_push_sender_internal(push_sender);
1351    }
1352    if let Some(resolver) = dns_resolver {
1353        config = config.set_dns_resolver_internal(resolver.clone());
1354    }
1355    #[cfg(feature = "cache-aio")]
1356    if let Some(cache_manager) = cache_manager {
1357        config = config.set_cache_manager(cache_manager.clone_and_increase_epoch());
1358    }
1359    let mut conn = match C::connect_with_config(info, config).await {
1360        Ok(conn) => conn,
1361        Err(err) => {
1362            warn!("Failed to connect to node: {node:?}, due to: {err:?}");
1363            return Err(err);
1364        }
1365    };
1366
1367    let check = if read_from_replicas {
1368        // If READONLY is sent to primary nodes, it will have no effect
1369        cmd("READONLY")
1370    } else {
1371        cmd("PING")
1372    };
1373
1374    conn.req_packed_command(&check).await?;
1375    Ok(conn)
1376}
1377
1378async fn check_connection<C>(conn: &mut C) -> RedisResult<()>
1379where
1380    C: ConnectionLike + Send + 'static,
1381{
1382    let mut cmd = Cmd::new();
1383    cmd.arg("PING");
1384    cmd.query_async::<String>(conn).await?;
1385    Ok(())
1386}
1387
1388fn get_random_connection<C>(connections: &ConnectionMap<C>) -> Option<(String, C)>
1389where
1390    C: Clone,
1391{
1392    connections.keys().choose(&mut rng()).and_then(|addr| {
1393        connections
1394            .get(addr)
1395            .map(|conn| (addr.clone(), conn.clone()))
1396    })
1397}