redis/cluster_async/
mod.rs

1//! This module provides async functionality for connecting to Redis / Valkey Clusters.
2//!
3//! The cluster connection is meant to abstract the fact that a cluster is composed of multiple nodes,
4//! and to provide an API which is as close as possible to that of a single node connection. In order to do that,
5//! the cluster connection maintains connections to each node in the Redis/ Valkey cluster, and can route
6//! requests automatically to the relevant nodes. In cases that the cluster connection receives indications
7//! that the cluster topology has changed, it will query nodes in order to find the current cluster topology.
8//! If it disconnects from some nodes, it will automatically reconnect to those nodes.
9//!
10//! By default, [`ClusterConnection`] makes use of [`MultiplexedConnection`] and maintains a pool
11//! of connections to each node in the cluster.
12//!
13//! # Example
14//! ```rust,no_run
15//! use redis::cluster::ClusterClient;
16//! use redis::AsyncCommands;
17//!
18//! async fn fetch_an_integer() -> String {
19//!     let nodes = vec!["redis://127.0.0.1/"];
20//!     let client = ClusterClient::new(nodes).unwrap();
21//!     let mut connection = client.get_async_connection().await.unwrap();
22//!     let _: () = connection.set("test", "test_data").await.unwrap();
23//!     let rv: String = connection.get("test").await.unwrap();
24//!     return rv;
25//! }
26//! ```
27//!
28//! # Pipelining
29//! ```rust,no_run
30//! use redis::cluster::ClusterClient;
31//! use redis::{Value, AsyncCommands};
32//!
33//! async fn fetch_an_integer() -> redis::RedisResult<()> {
34//!     let nodes = vec!["redis://127.0.0.1/"];
35//!     let client = ClusterClient::new(nodes).unwrap();
36//!     let mut connection = client.get_async_connection().await.unwrap();
37//!     let key = "test";
38//!
39//!     redis::pipe()
40//!         .rpush(key, "123").ignore()
41//!         .ltrim(key, -10, -1).ignore()
42//!         .expire(key, 60).ignore()
43//!         .exec_async(&mut connection).await
44//! }
45//! ```
46//!
47//! # Pubsub
48//!
49//! Pubsub, and generally receiving push messages from the cluster nodes, is now supported
50//! when defining a connection with [crate::ProtocolVersion::RESP3] and some
51//! [crate::aio::AsyncPushSender] to receive the messages on.
52//!
53//! ```rust,no_run
54//! use redis::cluster::ClusterClientBuilder;
55//! use redis::{Value, AsyncCommands};
56//!
57//! async fn fetch_an_integer() -> redis::RedisResult<()> {
58//!     let nodes = vec!["redis://127.0.0.1/?protocol=3"];
59//!     let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
60//!     let client = ClusterClientBuilder::new(nodes)
61//!         .use_protocol(redis::ProtocolVersion::RESP3)
62//!         .push_sender(tx).build()?;
63//!     let mut connection = client.get_async_connection().await?;
64//!     connection.subscribe("channel").await?;
65//!     while let Some(msg) = rx.recv().await {
66//!         println!("Got: {:?}", msg);
67//!     }
68//!     Ok(())
69//! }
70//! ```
71//!
72//! # Sending request to specific node
73//! In some cases you'd want to send a request to a specific node in the cluster, instead of
74//! letting the cluster connection decide by itself to which node it should send the request.
75//! This can happen, for example, if you want to send SCAN commands to each node in the cluster.
76//!
77//! ```rust,no_run
78//! use redis::cluster::ClusterClient;
79//! use redis::{Value, AsyncCommands};
80//! use redis::cluster_routing::{ RoutingInfo, SingleNodeRoutingInfo };
81//!
82//! async fn fetch_an_integer() -> redis::RedisResult<Value> {
83//!     let nodes = vec!["redis://127.0.0.1/"];
84//!     let client = ClusterClient::new(nodes)?;
85//!     let mut connection = client.get_async_connection().await?;
86//!     let routing_info = RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress{
87//!         host: "redis://127.0.0.1".to_string(),
88//!         port: 6378
89//!     });
90//!     connection.route_command(&redis::cmd("PING"), routing_info).await
91//! }
92//! ```
93use std::{
94    collections::HashMap,
95    fmt,
96    future::Future,
97    io, mem,
98    pin::Pin,
99    sync::{Arc, Mutex},
100    task::{self, Poll},
101    time::Duration,
102};
103
104mod request;
105mod routing;
106use crate::{
107    aio::{check_resp3, ConnectionLike, HandleContainer, MultiplexedConnection, Runtime},
108    cluster::{get_connection_info, slot_cmd},
109    cluster_client::ClusterParams,
110    cluster_routing::{
111        MultipleNodeRoutingInfo, Redirect, ResponsePolicy, RoutingInfo, SingleNodeRoutingInfo,
112        Slot, SlotMap,
113    },
114    cluster_topology::parse_slots,
115    cmd,
116    subscription_tracker::SubscriptionTracker,
117    types::closed_connection_error,
118    AsyncConnectionConfig, Cmd, ConnectionInfo, ErrorKind, IntoConnectionInfo, RedisError,
119    RedisFuture, RedisResult, ToRedisArgs, Value,
120};
121
122use crate::ProtocolVersion;
123use futures_sink::Sink;
124use futures_util::{
125    future::{self, BoxFuture, FutureExt},
126    ready,
127    stream::{self, Stream, StreamExt},
128};
129use log::{debug, trace, warn};
130use rand::{rng, seq::IteratorRandom};
131use request::{CmdArg, PendingRequest, Request, RequestState, Retry};
132use routing::{route_for_pipeline, InternalRoutingInfo, InternalSingleNodeRouting};
133use tokio::sync::{mpsc, oneshot, RwLock};
134
135struct ClientSideState {
136    protocol: ProtocolVersion,
137    _task_handle: HandleContainer,
138    response_timeout: Option<Duration>,
139    runtime: Runtime,
140}
141
142/// This represents an async Redis Cluster connection.
143///
144/// It stores the underlying connections maintained for each node in the cluster,
145/// as well as common parameters for connecting to nodes and executing commands.
146#[derive(Clone)]
147pub struct ClusterConnection<C = MultiplexedConnection> {
148    state: Arc<ClientSideState>,
149    sender: mpsc::Sender<Message<C>>,
150}
151
152impl<C> ClusterConnection<C>
153where
154    C: ConnectionLike + Connect + Clone + Send + Sync + Unpin + 'static,
155{
156    pub(crate) async fn new(
157        initial_nodes: &[ConnectionInfo],
158        cluster_params: ClusterParams,
159    ) -> RedisResult<ClusterConnection<C>> {
160        let protocol = cluster_params.protocol.unwrap_or_default();
161        let response_timeout = cluster_params.response_timeout;
162        let runtime = Runtime::locate();
163        ClusterConnInner::new(initial_nodes, cluster_params)
164            .await
165            .map(|inner| {
166                let (sender, mut receiver) = mpsc::channel::<Message<_>>(100);
167                let stream = async move {
168                    let _ = stream::poll_fn(move |cx| receiver.poll_recv(cx))
169                        .map(Ok)
170                        .forward(inner)
171                        .await;
172                };
173                let _task_handle = HandleContainer::new(runtime.spawn(stream));
174
175                ClusterConnection {
176                    sender,
177                    state: Arc::new(ClientSideState {
178                        protocol,
179                        _task_handle,
180                        response_timeout,
181                        runtime,
182                    }),
183                }
184            })
185    }
186
187    /// Send a command to the given `routing`, and aggregate the response according to `response_policy`.
188    pub async fn route_command(&mut self, cmd: &Cmd, routing: RoutingInfo) -> RedisResult<Value> {
189        trace!("send_packed_command");
190        let (sender, receiver) = oneshot::channel();
191        let request = async {
192            self.sender
193                .send(Message {
194                    cmd: CmdArg::Cmd {
195                        cmd: Arc::new(cmd.clone()), // TODO Remove this clone?
196                        routing: routing.into(),
197                    },
198                    sender,
199                })
200                .await
201                .map_err(|_| {
202                    RedisError::from(io::Error::new(
203                        io::ErrorKind::BrokenPipe,
204                        "redis_cluster: Unable to send command",
205                    ))
206                })?;
207
208            receiver
209                .await
210                .unwrap_or_else(|_| {
211                    Err(RedisError::from(io::Error::new(
212                        io::ErrorKind::BrokenPipe,
213                        "redis_cluster: Unable to receive command",
214                    )))
215                })
216                .map(|response| match response {
217                    Response::Single(value) => value,
218                    Response::Multiple(_) => unreachable!(),
219                })
220        };
221
222        match self.state.response_timeout {
223            Some(duration) => self.state.runtime.timeout(duration, request).await?,
224            None => request.await,
225        }
226    }
227
228    /// Send commands in `pipeline` to the given `route`. If `route` is [None], it will be sent to a random node.
229    pub async fn route_pipeline<'a>(
230        &'a mut self,
231        pipeline: &'a crate::Pipeline,
232        offset: usize,
233        count: usize,
234        route: SingleNodeRoutingInfo,
235    ) -> RedisResult<Vec<Value>> {
236        let (sender, receiver) = oneshot::channel();
237
238        let request = async {
239            self.sender
240                .send(Message {
241                    cmd: CmdArg::Pipeline {
242                        pipeline: Arc::new(pipeline.clone()), // TODO Remove this clone?
243                        offset,
244                        count,
245                        route: route.into(),
246                    },
247                    sender,
248                })
249                .await
250                .map_err(|_| closed_connection_error())?;
251            receiver
252                .await
253                .unwrap_or_else(|_| Err(closed_connection_error()))
254                .map(|response| match response {
255                    Response::Multiple(values) => values,
256                    Response::Single(_) => unreachable!(),
257                })
258        };
259
260        match self.state.response_timeout {
261            Some(duration) => self.state.runtime.timeout(duration, request).await?,
262            None => request.await,
263        }
264    }
265
266    /// Subscribes to a new channel(s).    
267    ///
268    /// Updates from the sender will be sent on the push sender that was passed to the manager.
269    /// If the manager was configured without a push sender, the connection won't be able to pass messages back to the user.
270    ///
271    /// This method is only available when the connection is using RESP3 protocol, and will return an error otherwise.
272    /// It should be noted that the subscription will be automatically resubscribed after disconnections, so the user might
273    /// receive additional pushes with [crate::PushKind::Subcribe], later after the subscription completed.
274    pub async fn subscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> {
275        check_resp3!(self.state.protocol);
276        let mut cmd = cmd("SUBSCRIBE");
277        cmd.arg(channel_name);
278        cmd.exec_async(self).await?;
279        Ok(())
280    }
281
282    /// Unsubscribes from channel(s).
283    ///
284    /// This method is only available when the connection is using RESP3 protocol, and will return an error otherwise.
285    pub async fn unsubscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> {
286        check_resp3!(self.state.protocol);
287        let mut cmd = cmd("UNSUBSCRIBE");
288        cmd.arg(channel_name);
289        cmd.exec_async(self).await?;
290        Ok(())
291    }
292
293    /// Subscribes to new channel(s) with pattern(s).
294    ///
295    /// Updates from the sender will be sent on the push sender that was passed to the manager.
296    /// If the manager was configured without a push sender, the manager won't be able to pass messages back to the user.
297    ///
298    /// This method is only available when the connection is using RESP3 protocol, and will return an error otherwise.
299    /// It should be noted that the subscription will be automatically resubscribed after disconnections, so the user might
300    /// receive additional pushes with [crate::PushKind::PSubcribe], later after the subscription completed.
301    pub async fn psubscribe(&mut self, channel_pattern: impl ToRedisArgs) -> RedisResult<()> {
302        check_resp3!(self.state.protocol);
303        let mut cmd = cmd("PSUBSCRIBE");
304        cmd.arg(channel_pattern);
305        cmd.exec_async(self).await?;
306        Ok(())
307    }
308
309    /// Unsubscribes from channel pattern(s).
310    ///
311    /// This method is only available when the connection is using RESP3 protocol, and will return an error otherwise.
312    pub async fn punsubscribe(&mut self, channel_pattern: impl ToRedisArgs) -> RedisResult<()> {
313        check_resp3!(self.state.protocol);
314        let mut cmd = cmd("PUNSUBSCRIBE");
315        cmd.arg(channel_pattern);
316        cmd.exec_async(self).await?;
317        Ok(())
318    }
319
320    /// Subscribes to a new sharded channel(s).
321    ///
322    /// Updates from the sender will be sent on the push sender that was passed to the manager.
323    /// If the manager was configured without a push sender, the manager won't be able to pass messages back to the user.
324    ///
325    /// This method is only available when the connection is using RESP3 protocol, and will return an error otherwise.
326    /// It should be noted that the subscription will be automatically resubscribed after disconnections, so the user might
327    /// receive additional pushes with [crate::PushKind::SSubcribe], later after the subscription completed.
328    pub async fn ssubscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> {
329        check_resp3!(self.state.protocol);
330        let mut cmd = cmd("SSUBSCRIBE");
331        cmd.arg(channel_name);
332        cmd.exec_async(self).await?;
333        Ok(())
334    }
335
336    /// Unsubscribes from sharded channel(s).
337    ///
338    /// This method is only available when the connection is using RESP3 protocol, and will return an error otherwise.
339    pub async fn sunsubscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> {
340        check_resp3!(self.state.protocol);
341        let mut cmd = cmd("SUNSUBSCRIBE");
342        cmd.arg(channel_name);
343        cmd.exec_async(self).await?;
344        Ok(())
345    }
346}
347
348type ConnectionFuture<C> = future::Shared<BoxFuture<'static, C>>;
349type ConnectionMap<C> = HashMap<String, ConnectionFuture<C>>;
350
351/// This is the internal representation of an async Redis Cluster connection. It stores the
352/// underlying connections maintained for each node in the cluster, as well
353/// as common parameters for connecting to nodes and executing commands.
354struct InnerCore<C> {
355    conn_lock: RwLock<(ConnectionMap<C>, SlotMap)>,
356    cluster_params: ClusterParams,
357    pending_requests: Mutex<Vec<PendingRequest<C>>>,
358    initial_nodes: Vec<ConnectionInfo>,
359    subscription_tracker: Option<Mutex<SubscriptionTracker>>,
360}
361
362type Core<C> = Arc<InnerCore<C>>;
363
364/// This is the sink for requests sent by the user.
365/// It holds the stream of requests which are "in flight", E.G. on their way to the server,
366/// and the inner representation of the connection.
367struct ClusterConnInner<C> {
368    inner: Core<C>,
369    state: ConnectionState,
370    #[allow(clippy::complexity)]
371    in_flight_requests: stream::FuturesUnordered<Pin<Box<Request<C>>>>,
372    refresh_error: Option<RedisError>,
373}
374
375fn boxed_sleep(duration: Duration) -> BoxFuture<'static, ()> {
376    Box::pin(Runtime::locate_and_sleep(duration))
377}
378
379#[derive(Debug, PartialEq)]
380pub(crate) enum Response {
381    Single(Value),
382    Multiple(Vec<Value>),
383}
384
385enum OperationTarget {
386    Node { address: String },
387    NotFound,
388    FanOut,
389}
390type OperationResult = Result<Response, (OperationTarget, RedisError)>;
391
392impl From<String> for OperationTarget {
393    fn from(address: String) -> Self {
394        OperationTarget::Node { address }
395    }
396}
397
398struct Message<C> {
399    cmd: CmdArg<C>,
400    sender: oneshot::Sender<RedisResult<Response>>,
401}
402
403enum RecoverFuture {
404    RecoverSlots(BoxFuture<'static, RedisResult<()>>),
405    Reconnect(BoxFuture<'static, ()>),
406}
407
408enum ConnectionState {
409    PollComplete,
410    Recover(RecoverFuture),
411}
412
413impl fmt::Debug for ConnectionState {
414    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
415        write!(
416            f,
417            "{}",
418            match self {
419                ConnectionState::PollComplete => "PollComplete",
420                ConnectionState::Recover(_) => "Recover",
421            }
422        )
423    }
424}
425
426impl<C> ClusterConnInner<C>
427where
428    C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
429{
430    async fn new(
431        initial_nodes: &[ConnectionInfo],
432        cluster_params: ClusterParams,
433    ) -> RedisResult<Self> {
434        let connections = Self::create_initial_connections(initial_nodes, &cluster_params).await?;
435        let subscription_tracker = if cluster_params.async_push_sender.is_some() {
436            Some(Mutex::new(SubscriptionTracker::default()))
437        } else {
438            None
439        };
440        let inner = Arc::new(InnerCore {
441            conn_lock: RwLock::new((connections, SlotMap::new(cluster_params.read_from_replicas))),
442            cluster_params,
443            pending_requests: Mutex::new(Vec::new()),
444            initial_nodes: initial_nodes.to_vec(),
445            subscription_tracker,
446        });
447        let connection = ClusterConnInner {
448            inner,
449            in_flight_requests: Default::default(),
450            refresh_error: None,
451            state: ConnectionState::PollComplete,
452        };
453        Self::refresh_slots(connection.inner.clone()).await?;
454        Ok(connection)
455    }
456
457    async fn create_initial_connections(
458        initial_nodes: &[ConnectionInfo],
459        params: &ClusterParams,
460    ) -> RedisResult<ConnectionMap<C>> {
461        let (connections, error) = stream::iter(initial_nodes.iter().cloned())
462            .map(|info| {
463                let params = params.clone();
464                async move {
465                    let addr = info.addr.to_string();
466                    let result = connect_and_check(&addr, params).await;
467                    match result {
468                        Ok(conn) => Ok((addr, async { conn }.boxed().shared())),
469                        Err(e) => {
470                            debug!("Failed to connect to initial node: {:?}", e);
471                            Err(e)
472                        }
473                    }
474                }
475            })
476            .buffer_unordered(initial_nodes.len())
477            .fold(
478                (ConnectionMap::<C>::with_capacity(initial_nodes.len()), None),
479                |(mut connections, mut error), result| async move {
480                    match result {
481                        Ok((addr, conn)) => {
482                            connections.insert(addr, conn);
483                        }
484                        Err(err) => {
485                            // Store at least one error to use as detail in the connection error if
486                            // all connections fail.
487                            error = Some(err);
488                        }
489                    }
490                    (connections, error)
491                },
492            )
493            .await;
494        if connections.is_empty() {
495            if let Some(err) = error {
496                return Err(RedisError::from((
497                    ErrorKind::IoError,
498                    "Failed to create initial connections",
499                    err.to_string(),
500                )));
501            } else {
502                return Err(RedisError::from((
503                    ErrorKind::IoError,
504                    "Failed to create initial connections",
505                )));
506            }
507        }
508        Ok(connections)
509    }
510
511    fn resubscribe(&self) {
512        let Some(subscription_tracker) = self.inner.subscription_tracker.as_ref() else {
513            return;
514        };
515
516        let subscription_pipe = subscription_tracker
517            .lock()
518            .unwrap()
519            .get_subscription_pipeline();
520
521        // we send request per cmd, instead of sending the pipe together, in order to send each command to the relevant node, instead of all together to a single node.
522        let requests = subscription_pipe.cmd_iter().map(|cmd| {
523            let routing = RoutingInfo::for_routable(cmd)
524                .unwrap_or(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
525                .into();
526            PendingRequest {
527                retry: 0,
528                sender: request::ResultExpectation::Internal,
529                cmd: CmdArg::Cmd {
530                    cmd: Arc::new(cmd.clone()),
531                    routing,
532                },
533            }
534        });
535        self.inner.pending_requests.lock().unwrap().extend(requests);
536    }
537
538    fn reconnect_to_initial_nodes(&mut self) -> impl Future<Output = ()> {
539        debug!("Received request to reconnect to initial nodes");
540        let inner = self.inner.clone();
541        async move {
542            let connection_map =
543                match Self::create_initial_connections(&inner.initial_nodes, &inner.cluster_params)
544                    .await
545                {
546                    Ok(map) => map,
547                    Err(err) => {
548                        warn!("Can't reconnect to initial nodes: `{err}`");
549                        return;
550                    }
551                };
552            let mut write_lock = inner.conn_lock.write().await;
553            *write_lock = (
554                connection_map,
555                SlotMap::new(inner.cluster_params.read_from_replicas),
556            );
557            drop(write_lock);
558            if let Err(err) = Self::refresh_slots(inner.clone()).await {
559                warn!("Can't refresh slots with initial nodes: `{err}`");
560            };
561        }
562    }
563
564    fn refresh_connections(&mut self, addrs: Vec<String>) -> impl Future<Output = ()> {
565        let inner = self.inner.clone();
566        async move {
567            let mut write_guard = inner.conn_lock.write().await;
568            let mut connections = stream::iter(addrs)
569                .fold(
570                    mem::take(&mut write_guard.0),
571                    |mut connections, addr| async {
572                        let conn = Self::get_or_create_conn(
573                            &addr,
574                            connections.remove(&addr),
575                            &inner.cluster_params,
576                        )
577                        .await;
578                        if let Ok(conn) = conn {
579                            connections.insert(addr, async { conn }.boxed().shared());
580                        }
581                        connections
582                    },
583                )
584                .await;
585            write_guard.0 = mem::take(&mut connections);
586        }
587    }
588
589    // Query a node to discover slot-> master mappings.
590    async fn refresh_slots(inner: Core<C>) -> RedisResult<()> {
591        let mut write_guard = inner.conn_lock.write().await;
592        let mut connections = mem::take(&mut write_guard.0);
593        let slots = &mut write_guard.1;
594        let mut result = Ok(());
595        for (addr, conn) in connections.iter_mut() {
596            let mut conn = conn.clone().await;
597            let value = match conn
598                .req_packed_command(&slot_cmd())
599                .await
600                .and_then(|value| value.extract_error())
601            {
602                Ok(value) => value,
603                Err(err) => {
604                    result = Err(err);
605                    continue;
606                }
607            };
608            match parse_slots(
609                value,
610                inner.cluster_params.tls,
611                addr.rsplit_once(':').unwrap().0,
612            )
613            .and_then(|v: Vec<Slot>| Self::build_slot_map(slots, v))
614            {
615                Ok(_) => {
616                    result = Ok(());
617                    break;
618                }
619                Err(err) => result = Err(err),
620            }
621        }
622        result?;
623
624        let mut nodes = write_guard.1.values().flatten().collect::<Vec<_>>();
625        nodes.sort_unstable();
626        nodes.dedup();
627        let nodes_len = nodes.len();
628        let addresses_and_connections_iter = nodes
629            .into_iter()
630            .map(|addr| (addr, connections.remove(addr)));
631
632        write_guard.0 = stream::iter(addresses_and_connections_iter)
633            .fold(
634                HashMap::with_capacity(nodes_len),
635                |mut connections, (addr, connection)| async {
636                    let conn =
637                        Self::get_or_create_conn(addr, connection, &inner.cluster_params).await;
638                    if let Ok(conn) = conn {
639                        connections.insert(addr.to_string(), async { conn }.boxed().shared());
640                    }
641                    connections
642                },
643            )
644            .await;
645
646        Ok(())
647    }
648
649    fn build_slot_map(slot_map: &mut SlotMap, slots_data: Vec<Slot>) -> RedisResult<()> {
650        slot_map.clear();
651        slot_map.fill_slots(slots_data);
652        trace!("{:?}", slot_map);
653        Ok(())
654    }
655
656    async fn aggregate_results(
657        receivers: Vec<(String, oneshot::Receiver<RedisResult<Response>>)>,
658        routing: &MultipleNodeRoutingInfo,
659        response_policy: Option<ResponsePolicy>,
660    ) -> RedisResult<Value> {
661        if receivers.is_empty() {
662            return Err((
663                ErrorKind::ClusterConnectionNotFound,
664                "No nodes found for multi-node operation",
665            )
666                .into());
667        }
668
669        let extract_result = |response| match response {
670            Response::Single(value) => value,
671            Response::Multiple(_) => unreachable!(),
672        };
673
674        let convert_result = |res: Result<RedisResult<Response>, _>| {
675            res.map_err(|_| RedisError::from((ErrorKind::ResponseError, "request wasn't handled due to internal failure"))) // this happens only if the result sender is dropped before usage.
676            .and_then(|res| res.map(extract_result))
677        };
678
679        let get_receiver = |(_, receiver): (_, oneshot::Receiver<RedisResult<Response>>)| async {
680            convert_result(receiver.await)
681        };
682
683        // TODO - once Value::Error will be merged, these will need to be updated to handle this new value.
684        match response_policy {
685            Some(ResponsePolicy::AllSucceeded) => {
686                future::try_join_all(receivers.into_iter().map(get_receiver))
687                    .await
688                    .and_then(|mut results| {
689                        results.pop().ok_or(
690                            (
691                                ErrorKind::ClusterConnectionNotFound,
692                                "No results received for multi-node operation",
693                            )
694                                .into(),
695                        )
696                    })
697            }
698            Some(ResponsePolicy::OneSucceeded) => future::select_ok(
699                receivers
700                    .into_iter()
701                    .map(|tuple| Box::pin(get_receiver(tuple))),
702            )
703            .await
704            .map(|(result, _)| result),
705            Some(ResponsePolicy::OneSucceededNonEmpty) => {
706                future::select_ok(receivers.into_iter().map(|(_, receiver)| {
707                    Box::pin(async move {
708                        let result = convert_result(receiver.await)?;
709                        match result {
710                            Value::Nil => Err((ErrorKind::ResponseError, "no value found").into()),
711                            _ => Ok(result),
712                        }
713                    })
714                }))
715                .await
716                .map(|(result, _)| result)
717            }
718            Some(ResponsePolicy::Aggregate(op)) => {
719                future::try_join_all(receivers.into_iter().map(get_receiver))
720                    .await
721                    .and_then(|results| crate::cluster_routing::aggregate(results, op))
722            }
723            Some(ResponsePolicy::AggregateLogical(op)) => {
724                future::try_join_all(receivers.into_iter().map(get_receiver))
725                    .await
726                    .and_then(|results| crate::cluster_routing::logical_aggregate(results, op))
727            }
728            Some(ResponsePolicy::CombineArrays) => {
729                future::try_join_all(receivers.into_iter().map(get_receiver))
730                    .await
731                    .and_then(|results| match routing {
732                        MultipleNodeRoutingInfo::MultiSlot(vec) => {
733                            crate::cluster_routing::combine_and_sort_array_results(
734                                results,
735                                vec.iter().map(|(_, indices)| indices),
736                            )
737                        }
738                        _ => crate::cluster_routing::combine_array_results(results),
739                    })
740            }
741            Some(ResponsePolicy::Special) | None => {
742                // This is our assumption - if there's no coherent way to aggregate the responses, we just map each response to the sender, and pass it to the user.
743
744                // TODO - once Value::Error is merged, we can use join_all and report separate errors and also pass successes.
745                future::try_join_all(receivers.into_iter().map(|(addr, receiver)| async move {
746                    let result = convert_result(receiver.await)?;
747                    Ok((Value::BulkString(addr.into_bytes()), result))
748                }))
749                .await
750                .map(Value::Map)
751            }
752        }
753    }
754
755    async fn execute_on_multiple_nodes<'a>(
756        cmd: &'a Arc<Cmd>,
757        routing: &'a MultipleNodeRoutingInfo,
758        core: Core<C>,
759        response_policy: Option<ResponsePolicy>,
760    ) -> OperationResult {
761        let read_guard = core.conn_lock.read().await;
762        if read_guard.0.is_empty() {
763            return OperationResult::Err((
764                OperationTarget::FanOut,
765                (
766                    ErrorKind::ClusterConnectionNotFound,
767                    "No connections found for multi-node operation",
768                )
769                    .into(),
770            ));
771        }
772        let (receivers, requests): (Vec<_>, Vec<_>) = {
773            let to_request = |(addr, cmd): (&str, Arc<Cmd>)| {
774                read_guard.0.get(addr).cloned().map(|conn| {
775                    let (sender, receiver) = oneshot::channel();
776                    let addr = addr.to_string();
777                    (
778                        (addr.clone(), receiver),
779                        PendingRequest {
780                            retry: 0,
781                            sender: request::ResultExpectation::External(sender),
782                            cmd: CmdArg::Cmd {
783                                cmd,
784                                routing: InternalSingleNodeRouting::Connection {
785                                    identifier: addr,
786                                    conn,
787                                }
788                                .into(),
789                            },
790                        },
791                    )
792                })
793            };
794            let slot_map = &read_guard.1;
795
796            // TODO - these filter_map calls mean that we ignore nodes that are missing. Should we report an error in such cases?
797            // since some of the operators drop other requests, mapping to errors here might mean that no request is sent.
798            match routing {
799                MultipleNodeRoutingInfo::AllNodes => slot_map
800                    .addresses_for_all_nodes()
801                    .into_iter()
802                    .filter_map(|addr| to_request((addr, cmd.clone())))
803                    .unzip(),
804                MultipleNodeRoutingInfo::AllMasters => slot_map
805                    .addresses_for_all_primaries()
806                    .into_iter()
807                    .filter_map(|addr| to_request((addr, cmd.clone())))
808                    .unzip(),
809                MultipleNodeRoutingInfo::MultiSlot(routes) => slot_map
810                    .addresses_for_multi_slot(routes)
811                    .enumerate()
812                    .filter_map(|(index, addr_opt)| {
813                        addr_opt.and_then(|addr| {
814                            let (_, indices) = routes.get(index).unwrap();
815                            let cmd =
816                                Arc::new(crate::cluster_routing::command_for_multi_slot_indices(
817                                    cmd.as_ref(),
818                                    indices.iter(),
819                                ));
820                            to_request((addr, cmd))
821                        })
822                    })
823                    .unzip(),
824            }
825        };
826        drop(read_guard);
827        core.pending_requests.lock().unwrap().extend(requests);
828
829        Self::aggregate_results(receivers, routing, response_policy)
830            .await
831            .map(Response::Single)
832            .map_err(|err| (OperationTarget::FanOut, err))
833    }
834
835    async fn try_cmd_request(
836        cmd: Arc<Cmd>,
837        routing: InternalRoutingInfo<C>,
838        core: Core<C>,
839    ) -> OperationResult {
840        let route = match routing {
841            InternalRoutingInfo::SingleNode(single_node_routing) => single_node_routing,
842            InternalRoutingInfo::MultiNode((multi_node_routing, response_policy)) => {
843                return Self::execute_on_multiple_nodes(
844                    &cmd,
845                    &multi_node_routing,
846                    core,
847                    response_policy,
848                )
849                .await;
850            }
851        };
852
853        match Self::get_connection(route, core).await {
854            Ok((addr, mut conn)) => conn
855                .req_packed_command(&cmd)
856                .await
857                .and_then(|value| value.extract_error())
858                .map(Response::Single)
859                .map_err(|err| (addr.into(), err)),
860            Err(err) => Err((OperationTarget::NotFound, err)),
861        }
862    }
863
864    async fn try_pipeline_request(
865        pipeline: Arc<crate::Pipeline>,
866        offset: usize,
867        count: usize,
868        conn: impl Future<Output = RedisResult<(String, C)>>,
869    ) -> OperationResult {
870        match conn.await {
871            Ok((addr, mut conn)) => conn
872                .req_packed_commands(&pipeline, offset, count)
873                .await
874                .and_then(Value::extract_error_vec)
875                .map(Response::Multiple)
876                .map_err(|err| (OperationTarget::Node { address: addr }, err)),
877            Err(err) => Err((OperationTarget::NotFound, err)),
878        }
879    }
880
881    async fn try_request(cmd: CmdArg<C>, core: Core<C>) -> OperationResult {
882        match cmd {
883            CmdArg::Cmd { cmd, routing } => Self::try_cmd_request(cmd, routing, core).await,
884            CmdArg::Pipeline {
885                pipeline,
886                offset,
887                count,
888                route,
889            } => {
890                Self::try_pipeline_request(
891                    pipeline,
892                    offset,
893                    count,
894                    Self::get_connection(route, core),
895                )
896                .await
897            }
898        }
899    }
900
901    async fn get_connection(
902        route: InternalSingleNodeRouting<C>,
903        core: Core<C>,
904    ) -> RedisResult<(String, C)> {
905        let read_guard = core.conn_lock.read().await;
906
907        let conn = match route {
908            InternalSingleNodeRouting::Random => None,
909            InternalSingleNodeRouting::SpecificNode(route) => read_guard
910                .1
911                .slot_addr_for_route(&route)
912                .map(|addr| addr.to_string()),
913            InternalSingleNodeRouting::Connection { identifier, conn } => {
914                return Ok((identifier, conn.await));
915            }
916            InternalSingleNodeRouting::Redirect { redirect, .. } => {
917                drop(read_guard);
918                // redirected requests shouldn't use a random connection, so they have a separate codepath.
919                return Self::get_redirected_connection(redirect, core).await;
920            }
921            InternalSingleNodeRouting::ByAddress(address) => {
922                if let Some(conn) = read_guard.0.get(&address).cloned() {
923                    return Ok((address, conn.await));
924                } else {
925                    return Err((
926                        ErrorKind::ClientError,
927                        "Requested connection not found",
928                        address,
929                    )
930                        .into());
931                }
932            }
933        }
934        .map(|addr| {
935            let conn = read_guard.0.get(&addr).cloned();
936            (addr, conn)
937        });
938        drop(read_guard);
939
940        let addr_conn_option = match conn {
941            Some((addr, Some(conn))) => Some((addr, conn.await)),
942            Some((addr, None)) => connect_check_and_add(core.clone(), addr.clone())
943                .await
944                .ok()
945                .map(|conn| (addr, conn)),
946            None => None,
947        };
948
949        let (addr, conn) = match addr_conn_option {
950            Some(tuple) => tuple,
951            None => {
952                let read_guard = core.conn_lock.read().await;
953                if let Some((random_addr, random_conn_future)) =
954                    get_random_connection(&read_guard.0)
955                {
956                    drop(read_guard);
957                    (random_addr, random_conn_future.await)
958                } else {
959                    return Err(
960                        (ErrorKind::ClusterConnectionNotFound, "No connections found").into(),
961                    );
962                }
963            }
964        };
965
966        Ok((addr, conn))
967    }
968
969    async fn get_redirected_connection(
970        redirect: Redirect,
971        core: Core<C>,
972    ) -> RedisResult<(String, C)> {
973        let asking = matches!(redirect, Redirect::Ask(_));
974        let addr = match redirect {
975            Redirect::Moved(addr) => addr,
976            Redirect::Ask(addr) => addr,
977        };
978        let read_guard = core.conn_lock.read().await;
979        let conn = read_guard.0.get(&addr).cloned();
980        drop(read_guard);
981        let mut conn = match conn {
982            Some(conn) => conn.await,
983            None => connect_check_and_add(core.clone(), addr.clone()).await?,
984        };
985        if asking {
986            let _ = conn
987                .req_packed_command(&crate::cmd::cmd("ASKING"))
988                .await
989                .and_then(|value| value.extract_error());
990        }
991
992        Ok((addr, conn))
993    }
994
995    fn poll_recover(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), RedisError>> {
996        let recover_future = match &mut self.state {
997            ConnectionState::PollComplete => return Poll::Ready(Ok(())),
998            ConnectionState::Recover(future) => future,
999        };
1000        let res = match recover_future {
1001            RecoverFuture::RecoverSlots(ref mut future) => match ready!(future.as_mut().poll(cx)) {
1002                Ok(_) => {
1003                    trace!("Recovered!");
1004                    self.state = ConnectionState::PollComplete;
1005                    Ok(())
1006                }
1007                Err(err) => {
1008                    trace!("Recover slots failed!");
1009                    *future = Box::pin(Self::refresh_slots(self.inner.clone()));
1010                    Err(err)
1011                }
1012            },
1013            RecoverFuture::Reconnect(ref mut future) => {
1014                ready!(future.as_mut().poll(cx));
1015                trace!("Reconnected connections");
1016                self.state = ConnectionState::PollComplete;
1017                Ok(())
1018            }
1019        };
1020        if res.is_ok() {
1021            self.resubscribe();
1022        }
1023        Poll::Ready(res)
1024    }
1025
1026    fn poll_complete(&mut self, cx: &mut task::Context<'_>) -> Poll<PollFlushAction> {
1027        let mut poll_flush_action = PollFlushAction::None;
1028
1029        let mut pending_requests_guard = self.inner.pending_requests.lock().unwrap();
1030        if !pending_requests_guard.is_empty() {
1031            let mut pending_requests = mem::take(&mut *pending_requests_guard);
1032            for request in pending_requests.drain(..) {
1033                // Drop the request if noone is waiting for a response to free up resources for
1034                // requests callers care about (load shedding). It will be ambiguous whether the
1035                // request actually goes through regardless.
1036                if request.sender.is_closed() {
1037                    continue;
1038                }
1039
1040                let future = Self::try_request(request.cmd.clone(), self.inner.clone()).boxed();
1041                self.in_flight_requests.push(Box::pin(Request {
1042                    retry_params: self.inner.cluster_params.retry_params.clone(),
1043                    request: Some(request),
1044                    future: RequestState::Future { future },
1045                }));
1046            }
1047            *pending_requests_guard = pending_requests;
1048        }
1049        drop(pending_requests_guard);
1050
1051        loop {
1052            let (request_handling, next) =
1053                match Pin::new(&mut self.in_flight_requests).poll_next(cx) {
1054                    Poll::Ready(Some(result)) => result,
1055                    Poll::Ready(None) | Poll::Pending => break,
1056                };
1057            match request_handling {
1058                Some(Retry::MoveToPending { request }) => {
1059                    self.inner.pending_requests.lock().unwrap().push(request)
1060                }
1061                Some(Retry::Immediately { request }) => {
1062                    let future = Self::try_request(request.cmd.clone(), self.inner.clone());
1063                    self.in_flight_requests.push(Box::pin(Request {
1064                        retry_params: self.inner.cluster_params.retry_params.clone(),
1065                        request: Some(request),
1066                        future: RequestState::Future {
1067                            future: Box::pin(future),
1068                        },
1069                    }));
1070                }
1071                Some(Retry::AfterSleep {
1072                    request,
1073                    sleep_duration,
1074                }) => {
1075                    let future = RequestState::Sleep {
1076                        sleep: boxed_sleep(sleep_duration),
1077                    };
1078                    self.in_flight_requests.push(Box::pin(Request {
1079                        retry_params: self.inner.cluster_params.retry_params.clone(),
1080                        request: Some(request),
1081                        future,
1082                    }));
1083                }
1084                None => {}
1085            };
1086            poll_flush_action = poll_flush_action.change_state(next);
1087        }
1088
1089        if !matches!(poll_flush_action, PollFlushAction::None) || self.in_flight_requests.is_empty()
1090        {
1091            Poll::Ready(poll_flush_action)
1092        } else {
1093            Poll::Pending
1094        }
1095    }
1096
1097    fn send_refresh_error(&mut self) {
1098        if self.refresh_error.is_some() {
1099            if let Some(mut request) = Pin::new(&mut self.in_flight_requests)
1100                .iter_pin_mut()
1101                .find(|request| request.request.is_some())
1102            {
1103                (*request)
1104                    .as_mut()
1105                    .respond(Err(self.refresh_error.take().unwrap()));
1106            } else if let Some(request) = self.inner.pending_requests.lock().unwrap().pop() {
1107                request.sender.send(Err(self.refresh_error.take().unwrap()));
1108            }
1109        }
1110    }
1111
1112    async fn get_or_create_conn(
1113        addr: &str,
1114        conn_option: Option<ConnectionFuture<C>>,
1115        params: &ClusterParams,
1116    ) -> RedisResult<C> {
1117        if let Some(conn) = conn_option {
1118            let mut conn = conn.await;
1119            match check_connection(&mut conn).await {
1120                Ok(_) => Ok(conn),
1121                Err(_) => connect_and_check(addr, params.clone()).await,
1122            }
1123        } else {
1124            connect_and_check(addr, params.clone()).await
1125        }
1126    }
1127}
1128
1129#[derive(Debug, PartialEq)]
1130enum PollFlushAction {
1131    None,
1132    RebuildSlots,
1133    Reconnect(Vec<String>),
1134    ReconnectFromInitialConnections,
1135}
1136
1137impl PollFlushAction {
1138    fn change_state(self, next_state: PollFlushAction) -> PollFlushAction {
1139        match (self, next_state) {
1140            (PollFlushAction::None, next_state) => next_state,
1141            (next_state, PollFlushAction::None) => next_state,
1142            (PollFlushAction::ReconnectFromInitialConnections, _)
1143            | (_, PollFlushAction::ReconnectFromInitialConnections) => {
1144                PollFlushAction::ReconnectFromInitialConnections
1145            }
1146
1147            (PollFlushAction::RebuildSlots, _) | (_, PollFlushAction::RebuildSlots) => {
1148                PollFlushAction::RebuildSlots
1149            }
1150
1151            (PollFlushAction::Reconnect(mut addrs), PollFlushAction::Reconnect(new_addrs)) => {
1152                addrs.extend(new_addrs);
1153                Self::Reconnect(addrs)
1154            }
1155        }
1156    }
1157}
1158
1159impl<C> Sink<Message<C>> for ClusterConnInner<C>
1160where
1161    C: ConnectionLike + Connect + Clone + Send + Sync + Unpin + 'static,
1162{
1163    type Error = ();
1164
1165    fn poll_ready(self: Pin<&mut Self>, _cx: &mut task::Context) -> Poll<Result<(), Self::Error>> {
1166        Poll::Ready(Ok(()))
1167    }
1168
1169    fn start_send(self: Pin<&mut Self>, msg: Message<C>) -> Result<(), Self::Error> {
1170        trace!("start_send");
1171        let Message { cmd, sender } = msg;
1172
1173        if let Some(tracker) = &self.inner.subscription_tracker {
1174            // TODO - benchmark whether checking whether the command is a subscription outside of the mutex is more performant.
1175            let mut tracker = tracker.lock().unwrap();
1176            match &cmd {
1177                CmdArg::Cmd { cmd, .. } => tracker.update_with_cmd(cmd.as_ref()),
1178                CmdArg::Pipeline { pipeline, .. } => {
1179                    tracker.update_with_pipeline(pipeline.as_ref())
1180                }
1181            }
1182        };
1183
1184        self.inner
1185            .pending_requests
1186            .lock()
1187            .unwrap()
1188            .push(PendingRequest {
1189                retry: 0,
1190                sender: request::ResultExpectation::External(sender),
1191                cmd,
1192            });
1193        Ok(())
1194    }
1195
1196    fn poll_flush(
1197        mut self: Pin<&mut Self>,
1198        cx: &mut task::Context,
1199    ) -> Poll<Result<(), Self::Error>> {
1200        trace!("poll_flush: {:?}", self.state);
1201        loop {
1202            self.send_refresh_error();
1203
1204            if let Err(err) = ready!(self.as_mut().poll_recover(cx)) {
1205                // We failed to reconnect, while we will try again we will report the
1206                // error if we can to avoid getting trapped in an infinite loop of
1207                // trying to reconnect
1208                self.refresh_error = Some(err);
1209
1210                // Give other tasks a chance to progress before we try to recover
1211                // again. Since the future may not have registered a wake up we do so
1212                // now so the task is not forgotten
1213                cx.waker().wake_by_ref();
1214                return Poll::Pending;
1215            }
1216
1217            match ready!(self.poll_complete(cx)) {
1218                PollFlushAction::None => return Poll::Ready(Ok(())),
1219                PollFlushAction::RebuildSlots => {
1220                    self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
1221                        Self::refresh_slots(self.inner.clone()),
1222                    )));
1223                }
1224                PollFlushAction::Reconnect(addrs) => {
1225                    self.state = ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
1226                        self.refresh_connections(addrs),
1227                    )));
1228                }
1229                PollFlushAction::ReconnectFromInitialConnections => {
1230                    self.state = ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
1231                        self.reconnect_to_initial_nodes(),
1232                    )));
1233                }
1234            }
1235        }
1236    }
1237
1238    fn poll_close(
1239        mut self: Pin<&mut Self>,
1240        cx: &mut task::Context,
1241    ) -> Poll<Result<(), Self::Error>> {
1242        // Try to drive any in flight requests to completion
1243        match self.poll_complete(cx) {
1244            Poll::Ready(PollFlushAction::None) => (),
1245            Poll::Ready(_) => Err(())?,
1246            Poll::Pending => (),
1247        };
1248        // If we no longer have any requests in flight we are done (skips any reconnection
1249        // attempts)
1250        if self.in_flight_requests.is_empty() {
1251            return Poll::Ready(Ok(()));
1252        }
1253
1254        self.poll_flush(cx)
1255    }
1256}
1257
1258impl<C> ConnectionLike for ClusterConnection<C>
1259where
1260    C: ConnectionLike + Send + Clone + Unpin + Sync + Connect + 'static,
1261{
1262    fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
1263        let routing = RoutingInfo::for_routable(cmd)
1264            .unwrap_or(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random));
1265        self.route_command(cmd, routing).boxed()
1266    }
1267
1268    fn req_packed_commands<'a>(
1269        &'a mut self,
1270        pipeline: &'a crate::Pipeline,
1271        offset: usize,
1272        count: usize,
1273    ) -> RedisFuture<'a, Vec<Value>> {
1274        async move {
1275            let route = route_for_pipeline(pipeline)?;
1276            self.route_pipeline(pipeline, offset, count, route.into())
1277                .await
1278        }
1279        .boxed()
1280    }
1281
1282    fn get_db(&self) -> i64 {
1283        0
1284    }
1285}
1286/// Implements the process of connecting to a Redis server
1287/// and obtaining a connection handle.
1288pub trait Connect: Sized {
1289    /// Connect to a node, returning handle for command execution.
1290    fn connect_with_config<'a, T>(info: T, config: AsyncConnectionConfig) -> RedisFuture<'a, Self>
1291    where
1292        T: IntoConnectionInfo + Send + 'a;
1293}
1294
1295impl Connect for MultiplexedConnection {
1296    fn connect_with_config<'a, T>(info: T, config: AsyncConnectionConfig) -> RedisFuture<'a, Self>
1297    where
1298        T: IntoConnectionInfo + Send + 'a,
1299    {
1300        async move {
1301            let connection_info = info.into_connection_info()?;
1302            let client = crate::Client::open(connection_info)?;
1303            client
1304                .get_multiplexed_async_connection_with_config(&config)
1305                .await
1306        }
1307        .boxed()
1308    }
1309}
1310
1311async fn connect_check_and_add<C>(core: Core<C>, addr: String) -> RedisResult<C>
1312where
1313    C: ConnectionLike + Connect + Send + Clone + 'static,
1314{
1315    match connect_and_check::<C>(&addr, core.cluster_params.clone()).await {
1316        Ok(conn) => {
1317            let conn_clone = conn.clone();
1318            core.conn_lock
1319                .write()
1320                .await
1321                .0
1322                .insert(addr, async { conn_clone }.boxed().shared());
1323            Ok(conn)
1324        }
1325        Err(err) => Err(err),
1326    }
1327}
1328
1329async fn connect_and_check<C>(node: &str, params: ClusterParams) -> RedisResult<C>
1330where
1331    C: ConnectionLike + Connect + Send + 'static,
1332{
1333    let read_from_replicas = params.read_from_replicas;
1334    let connection_timeout = params.connection_timeout;
1335    let response_timeout = params.response_timeout;
1336    let push_sender = params.async_push_sender.clone();
1337    let tcp_settings = params.tcp_settings.clone();
1338    let info = get_connection_info(node, params)?;
1339    let mut config = AsyncConnectionConfig::default()
1340        .set_connection_timeout(connection_timeout)
1341        .set_tcp_settings(tcp_settings);
1342    if let Some(response_timeout) = response_timeout {
1343        config = config.set_response_timeout(response_timeout);
1344    };
1345    if let Some(push_sender) = push_sender {
1346        config = config.set_push_sender_internal(push_sender);
1347    }
1348    let mut conn = match C::connect_with_config(info, config).await {
1349        Ok(conn) => conn,
1350        Err(err) => {
1351            warn!("Failed to connect to node: {:?}, due to: {:?}", node, err);
1352            return Err(err);
1353        }
1354    };
1355
1356    let check = if read_from_replicas {
1357        // If READONLY is sent to primary nodes, it will have no effect
1358        cmd("READONLY")
1359    } else {
1360        cmd("PING")
1361    };
1362
1363    conn.req_packed_command(&check).await?;
1364    Ok(conn)
1365}
1366
1367async fn check_connection<C>(conn: &mut C) -> RedisResult<()>
1368where
1369    C: ConnectionLike + Send + 'static,
1370{
1371    let mut cmd = Cmd::new();
1372    cmd.arg("PING");
1373    cmd.query_async::<String>(conn).await?;
1374    Ok(())
1375}
1376
1377fn get_random_connection<C>(connections: &ConnectionMap<C>) -> Option<(String, ConnectionFuture<C>)>
1378where
1379    C: Clone,
1380{
1381    connections.keys().choose(&mut rng()).and_then(|addr| {
1382        connections
1383            .get(addr)
1384            .map(|conn| (addr.clone(), conn.clone()))
1385    })
1386}