redis/cluster_handling/sync_connection/
mod.rs

1//! This module extends the library to support Redis Cluster.
2//!
3//! The cluster connection is meant to abstract the fact that a cluster is composed of multiple nodes,
4//! and to provide an API which is as close as possible to that of a single node connection. In order to do that,
5//! the cluster connection maintains connections to each node in the Redis/ Valkey cluster, and can route
6//! requests automatically to the relevant nodes. In cases that the cluster connection receives indications
7//! that the cluster topology has changed, it will query nodes in order to find the current cluster topology.
8//! If it disconnects from some nodes, it will automatically reconnect to those nodes.
9//!
10//! Note that pubsub & push sending functionality is not currently provided by this module.
11//!
12//! # Example
13//! ```rust,no_run
14//! use redis::TypedCommands;
15//! use redis::cluster::ClusterClient;
16//!
17//! let nodes = vec!["redis://127.0.0.1:6379/", "redis://127.0.0.1:6378/", "redis://127.0.0.1:6377/"];
18//! let client = ClusterClient::new(nodes).unwrap();
19//! let mut connection = client.get_connection().unwrap();
20//!
21//! connection.set("test", "test_data").unwrap();
22//! let rv = connection.get("test").unwrap().unwrap();
23//!
24//! assert_eq!(rv.as_str(), "test_data");
25//! ```
26//!
27//! # Pipelining
28//! ```rust,no_run
29//! use redis::TypedCommands;
30//! use redis::cluster::{cluster_pipe, ClusterClient};
31//!
32//! let nodes = vec!["redis://127.0.0.1:6379/", "redis://127.0.0.1:6378/", "redis://127.0.0.1:6377/"];
33//! let client = ClusterClient::new(nodes).unwrap();
34//! let mut connection = client.get_connection().unwrap();
35//!
36//! let key = "test";
37//!
38//! cluster_pipe()
39//!     .rpush(key, "123").ignore()
40//!     .ltrim(key, -10, -1).ignore()
41//!     .expire(key, 60).ignore()
42//!     .exec(&mut connection).unwrap();
43//! ```
44//!
45//! # Sending request to specific node
46//! In some cases you'd want to send a request to a specific node in the cluster, instead of
47//! letting the cluster connection decide by itself to which node it should send the request.
48//! This can happen, for example, if you want to send SCAN commands to each node in the cluster.
49//!
50//! ```rust,no_run
51//! use redis::Commands;
52//! use redis::cluster::ClusterClient;
53//! use redis::cluster_routing::{ RoutingInfo, SingleNodeRoutingInfo };
54//!
55//! let nodes = vec!["redis://127.0.0.1:6379/", "redis://127.0.0.1:6378/", "redis://127.0.0.1:6377/"];
56//! let client = ClusterClient::new(nodes).unwrap();
57//! let mut connection = client.get_connection().unwrap();
58//!
59//! let routing_info = RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress{
60//!     host: "redis://127.0.0.1".to_string(),
61//!     port: 6378
62//! });
63//! let _: redis::Value = connection.route_command(&redis::cmd("PING"), routing_info).unwrap();
64//! ```
65use std::cell::RefCell;
66use std::collections::HashSet;
67use std::thread;
68use std::time::Duration;
69
70mod pipeline;
71
72pub use super::client::{ClusterClient, ClusterClientBuilder};
73use super::topology::parse_slots;
74use super::{
75    client::ClusterParams,
76    routing::{Redirect, Route, RoutingInfo},
77    slot_map::{SlotMap, SLOT_SIZE},
78};
79use crate::cluster_handling::{get_connection_info, slot_cmd, split_node_address};
80use crate::cluster_routing::{
81    MultipleNodeRoutingInfo, ResponsePolicy, Routable, SingleNodeRoutingInfo, SlotAddr,
82};
83use crate::cmd::{cmd, Cmd};
84use crate::connection::{connect, Connection, ConnectionInfo, ConnectionLike};
85use crate::errors::{ErrorKind, RedisError, RetryMethod};
86use crate::parser::parse_redis_value;
87use crate::types::{HashMap, RedisResult, Value};
88use crate::IntoConnectionInfo;
89pub use crate::TlsMode; // Pub for backwards compatibility
90use arcstr::ArcStr;
91use pipeline::UNROUTABLE_ERROR;
92use rand::{rng, seq::IteratorRandom, Rng};
93
94pub use pipeline::{cluster_pipe, ClusterPipeline};
95
96#[derive(Clone)]
97enum Input<'a> {
98    Slice {
99        cmd: &'a [u8],
100        routable: Value,
101    },
102    Cmd(&'a Cmd),
103    Commands {
104        cmd: &'a [u8],
105        offset: usize,
106        count: usize,
107    },
108}
109
110impl<'a> Input<'a> {
111    fn send(&'a self, connection: &mut impl ConnectionLike) -> RedisResult<Output> {
112        match self {
113            Input::Slice { cmd, routable: _ } => connection
114                .req_packed_command(cmd)
115                .and_then(|value| value.extract_error())
116                .map(Output::Single),
117            Input::Cmd(cmd) => connection
118                .req_command(cmd)
119                .and_then(|value| value.extract_error())
120                .map(Output::Single),
121            Input::Commands { cmd, offset, count } => connection
122                .req_packed_commands(cmd, *offset, *count)
123                .and_then(Value::extract_error_vec)
124                .map(Output::Multi),
125        }
126    }
127}
128
129impl Routable for Input<'_> {
130    fn arg_idx(&self, idx: usize) -> Option<&[u8]> {
131        match self {
132            Input::Slice { cmd: _, routable } => routable.arg_idx(idx),
133            Input::Cmd(cmd) => cmd.arg_idx(idx),
134            Input::Commands { .. } => None,
135        }
136    }
137
138    fn position(&self, candidate: &[u8]) -> Option<usize> {
139        match self {
140            Input::Slice { cmd: _, routable } => routable.position(candidate),
141            Input::Cmd(cmd) => cmd.position(candidate),
142            Input::Commands { .. } => None,
143        }
144    }
145}
146
147enum Output {
148    Single(Value),
149    Multi(Vec<Value>),
150}
151
152impl From<Output> for Value {
153    fn from(value: Output) -> Self {
154        match value {
155            Output::Single(value) => value,
156            Output::Multi(values) => Value::Array(values),
157        }
158    }
159}
160
161impl From<Output> for Vec<Value> {
162    fn from(value: Output) -> Self {
163        match value {
164            Output::Single(value) => vec![value],
165            Output::Multi(values) => values,
166        }
167    }
168}
169
170/// Implements the process of connecting to a Redis server
171/// and obtaining and configuring a connection handle.
172pub trait Connect: Sized {
173    /// Connect to a node, returning handle for command execution.
174    fn connect<T>(info: T, timeout: Option<Duration>) -> RedisResult<Self>
175    where
176        T: IntoConnectionInfo;
177
178    /// Sends an already encoded (packed) command into the TCP socket and
179    /// does not read a response.  This is useful for commands like
180    /// `MONITOR` which yield multiple items.  This needs to be used with
181    /// care because it changes the state of the connection.
182    fn send_packed_command(&mut self, cmd: &[u8]) -> RedisResult<()>;
183
184    /// Sets the write timeout for the connection.
185    ///
186    /// If the provided value is `None`, then `send_packed_command` call will
187    /// block indefinitely. It is an error to pass the zero `Duration` to this
188    /// method.
189    fn set_write_timeout(&self, dur: Option<Duration>) -> RedisResult<()>;
190
191    /// Sets the read timeout for the connection.
192    ///
193    /// If the provided value is `None`, then `recv_response` call will
194    /// block indefinitely. It is an error to pass the zero `Duration` to this
195    /// method.
196    fn set_read_timeout(&self, dur: Option<Duration>) -> RedisResult<()>;
197
198    /// Fetches a single response from the connection.  This is useful
199    /// if used in combination with `send_packed_command`.
200    fn recv_response(&mut self) -> RedisResult<Value>;
201}
202
203impl Connect for Connection {
204    fn connect<T>(info: T, timeout: Option<Duration>) -> RedisResult<Self>
205    where
206        T: IntoConnectionInfo,
207    {
208        connect(&info.into_connection_info()?, timeout)
209    }
210
211    fn send_packed_command(&mut self, cmd: &[u8]) -> RedisResult<()> {
212        Self::send_packed_command(self, cmd)
213    }
214
215    fn set_write_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
216        Self::set_write_timeout(self, dur)
217    }
218
219    fn set_read_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
220        Self::set_read_timeout(self, dur)
221    }
222
223    fn recv_response(&mut self) -> RedisResult<Value> {
224        Self::recv_response(self)
225    }
226}
227
228/// Options for creation of connection
229#[derive(Clone, Default)]
230pub struct ClusterConfig {
231    pub(crate) connection_timeout: Option<Duration>,
232    pub(crate) response_timeout: Option<Duration>,
233    #[cfg(feature = "cluster-async")]
234    pub(crate) async_push_sender: Option<std::sync::Arc<dyn crate::aio::AsyncPushSender>>,
235    #[cfg(feature = "cluster-async")]
236    pub(crate) async_dns_resolver: Option<std::sync::Arc<dyn crate::io::AsyncDNSResolver>>,
237}
238
239impl ClusterConfig {
240    /// Creates a new instance of the options with nothing set
241    pub fn new() -> Self {
242        Self::default()
243    }
244
245    /// Sets the connection timeout
246    pub fn set_connection_timeout(mut self, connection_timeout: std::time::Duration) -> Self {
247        self.connection_timeout = Some(connection_timeout);
248        self
249    }
250
251    /// Sets the response timeout
252    pub fn set_response_timeout(mut self, response_timeout: std::time::Duration) -> Self {
253        self.response_timeout = Some(response_timeout);
254        self
255    }
256
257    #[cfg(feature = "cluster-async")]
258    /// Sets a sender to receive pushed values.
259    ///
260    /// The sender can be a channel, or an arbitrary function that handles [crate::PushInfo] values.
261    /// This will fail client creation if the connection isn't configured for RESP3 communications via the [crate::RedisConnectionInfo::set_protocol] function.
262    ///
263    /// # Examples
264    ///
265    /// ```rust
266    /// # use redis::cluster::ClusterConfig;
267    /// let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
268    /// let config = ClusterConfig::new().set_push_sender(tx);
269    /// ```
270    ///
271    /// ```rust
272    /// # use std::sync::{Mutex, Arc};
273    /// # use redis::cluster::ClusterConfig;
274    /// let messages = Arc::new(Mutex::new(Vec::new()));
275    /// let config = ClusterConfig::new().set_push_sender(move |msg|{
276    ///     let Ok(mut messages) = messages.lock() else {
277    ///         return Err(redis::aio::SendError);
278    ///     };
279    ///     messages.push(msg);
280    ///     Ok(())
281    /// });
282    pub fn set_push_sender(mut self, sender: impl crate::aio::AsyncPushSender) -> Self {
283        self.async_push_sender = Some(std::sync::Arc::new(sender));
284        self
285    }
286
287    /// Set asynchronous DNS resolver for the underlying TCP connection.
288    ///
289    /// The parameter resolver must implement the [`crate::io::AsyncDNSResolver`] trait.
290    #[cfg(feature = "cluster-async")]
291    pub fn set_dns_resolver(mut self, resolver: impl crate::io::AsyncDNSResolver) -> Self {
292        self.async_dns_resolver = Some(std::sync::Arc::new(resolver));
293        self
294    }
295}
296
297/// This represents a Redis Cluster connection.
298///
299/// It stores the underlying connections maintained for each node in the cluster,
300/// as well as common parameters for connecting to nodes and executing commands.
301pub struct ClusterConnection<C = Connection> {
302    initial_nodes: Vec<ConnectionInfo>,
303    connections: RefCell<HashMap<ArcStr, C>>,
304    slots: RefCell<SlotMap>,
305    auto_reconnect: RefCell<bool>,
306    read_timeout: RefCell<Option<Duration>>,
307    write_timeout: RefCell<Option<Duration>>,
308    cluster_params: ClusterParams,
309}
310
311impl<C> ClusterConnection<C>
312where
313    C: ConnectionLike + Connect,
314{
315    pub(crate) fn new(
316        cluster_params: ClusterParams,
317        initial_nodes: Vec<ConnectionInfo>,
318    ) -> RedisResult<Self> {
319        let connection = Self {
320            connections: RefCell::new(HashMap::new()),
321            slots: RefCell::new(SlotMap::new(cluster_params.read_from_replicas)),
322            auto_reconnect: RefCell::new(true),
323            read_timeout: RefCell::new(cluster_params.response_timeout),
324            write_timeout: RefCell::new(None),
325            initial_nodes: initial_nodes.to_vec(),
326            cluster_params,
327        };
328        connection.create_initial_connections()?;
329
330        Ok(connection)
331    }
332
333    /// Set an auto reconnect attribute.
334    /// Default value is true;
335    pub fn set_auto_reconnect(&self, value: bool) {
336        let mut auto_reconnect = self.auto_reconnect.borrow_mut();
337        *auto_reconnect = value;
338    }
339
340    /// Sets the write timeout for the connection.
341    ///
342    /// If the provided value is `None`, then `send_packed_command` call will
343    /// block indefinitely. It is an error to pass the zero `Duration` to this
344    /// method.
345    pub fn set_write_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
346        // Check if duration is valid before updating local value.
347        if dur.is_some() && dur.unwrap().is_zero() {
348            return Err(RedisError::from((
349                ErrorKind::InvalidClientConfig,
350                "Duration should be None or non-zero.",
351            )));
352        }
353
354        let mut t = self.write_timeout.borrow_mut();
355        *t = dur;
356        let connections = self.connections.borrow();
357        for conn in connections.values() {
358            conn.set_write_timeout(dur)?;
359        }
360        Ok(())
361    }
362
363    /// Sets the read timeout for the connection.
364    ///
365    /// If the provided value is `None`, then `recv_response` call will
366    /// block indefinitely. It is an error to pass the zero `Duration` to this
367    /// method.
368    pub fn set_read_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
369        // Check if duration is valid before updating local value.
370        if dur.is_some() && dur.unwrap().is_zero() {
371            return Err(RedisError::from((
372                ErrorKind::InvalidClientConfig,
373                "Duration should be None or non-zero.",
374            )));
375        }
376
377        let mut t = self.read_timeout.borrow_mut();
378        *t = dur;
379        let connections = self.connections.borrow();
380        for conn in connections.values() {
381            conn.set_read_timeout(dur)?;
382        }
383        Ok(())
384    }
385
386    /// Check that all connections it has are available (`PING` internally).
387    #[doc(hidden)]
388    pub fn check_connection(&mut self) -> bool {
389        <Self as ConnectionLike>::check_connection(self)
390    }
391
392    pub(crate) fn execute_pipeline(&mut self, pipe: &ClusterPipeline) -> RedisResult<Vec<Value>> {
393        self.send_recv_and_retry_cmds(pipe.commands())
394    }
395
396    /// Returns the connection status.
397    ///
398    /// The connection is open until any `read_response` call received an
399    /// invalid response from the server (most likely a closed or dropped
400    /// connection, otherwise a Redis protocol error). When using unix
401    /// sockets the connection is open until writing a command failed with a
402    /// `BrokenPipe` error.
403    fn create_initial_connections(&self) -> RedisResult<()> {
404        let mut connections = HashMap::with_capacity(self.initial_nodes.len());
405        let mut failed_connections = Vec::new();
406
407        for info in self.initial_nodes.iter() {
408            let addr = info.addr.to_string().into();
409
410            match self.connect(&addr) {
411                Ok(mut conn) => {
412                    if conn.check_connection() {
413                        connections.insert(addr, conn);
414                        break;
415                    } else {
416                        failed_connections.push((
417                            addr,
418                            RedisError::from((
419                                ErrorKind::Io,
420                                "Node failed to respond to connection check,",
421                            )),
422                        ));
423                    }
424                }
425                Err(conn_err) => {
426                    failed_connections.push((addr, conn_err));
427                }
428            }
429        }
430
431        if connections.is_empty() {
432            // Create a composite description of why connecting to each node failed.
433            let detail = if failed_connections.is_empty() {
434                "List of initial nodes is empty".to_string()
435            } else {
436                let mut formatted_detail = "Failed to connect to each cluster node (".to_string();
437
438                for (index, (addr, conn_err)) in failed_connections.into_iter().enumerate() {
439                    if index != 0 {
440                        formatted_detail += "; ";
441                    }
442                    use std::fmt::Write;
443                    let _ = write!(&mut formatted_detail, "{addr}: {conn_err}");
444                }
445                formatted_detail += ")";
446                formatted_detail
447            };
448
449            return Err(RedisError::from((
450                ErrorKind::Io,
451                "It failed to check startup nodes.",
452                detail,
453            )));
454        }
455
456        *self.connections.borrow_mut() = connections;
457        self.refresh_slots()?;
458        Ok(())
459    }
460
461    // Query a node to discover slot-> master mappings.
462    fn refresh_slots(&self) -> RedisResult<()> {
463        let mut slots = self.slots.borrow_mut();
464        *slots = self.create_new_slots()?;
465
466        let mut nodes = slots.values().flatten().collect::<Vec<_>>();
467        nodes.sort_unstable();
468        nodes.dedup();
469
470        let mut connections = self.connections.borrow_mut();
471        *connections = nodes
472            .into_iter()
473            .filter_map(|addr| {
474                if let Some(mut conn) = connections.remove(addr) {
475                    if conn.check_connection() {
476                        return Some((addr.clone(), conn));
477                    }
478                }
479
480                if let Ok(mut conn) = self.connect(addr) {
481                    if conn.check_connection() {
482                        return Some((addr.clone(), conn));
483                    }
484                }
485
486                None
487            })
488            .collect();
489
490        Ok(())
491    }
492
493    fn create_new_slots(&self) -> RedisResult<SlotMap> {
494        let mut connections = self.connections.borrow_mut();
495        let mut new_slots = None;
496
497        for (addr, conn) in connections.iter_mut() {
498            let value = conn.req_command(&slot_cmd())?;
499            if let Ok(slots_data) = parse_slots(value, addr.rsplit_once(':').unwrap().0) {
500                new_slots = Some(SlotMap::from_slots(
501                    slots_data,
502                    self.cluster_params.read_from_replicas,
503                ));
504                break;
505            }
506        }
507
508        match new_slots {
509            Some(new_slots) => Ok(new_slots),
510            None => Err(RedisError::from((
511                ErrorKind::Client,
512                "Slot refresh error. didn't get any slots from server",
513            ))),
514        }
515    }
516
517    fn connect(&self, node: &ArcStr) -> RedisResult<C> {
518        let info = get_connection_info(node, &self.cluster_params)?;
519
520        let mut conn = C::connect(info, Some(self.cluster_params.connection_timeout))?;
521        if self.cluster_params.read_from_replicas {
522            // If READONLY is sent to primary nodes, it will have no effect
523            cmd("READONLY").exec(&mut conn)?;
524        }
525        conn.set_read_timeout(*self.read_timeout.borrow())?;
526        conn.set_write_timeout(*self.write_timeout.borrow())?;
527        Ok(conn)
528    }
529
530    fn get_connection<'a>(
531        &self,
532        connections: &'a mut HashMap<ArcStr, C>,
533        route: &Route,
534    ) -> (ArcStr, RedisResult<&'a mut C>) {
535        let slots = self.slots.borrow();
536        if let Some(addr) = slots.slot_addr_for_route(route) {
537            (addr.clone(), self.get_connection_by_addr(connections, addr))
538        } else {
539            // try a random node next.  This is safe if slots are involved
540            // as a wrong node would reject the request.
541            get_random_connection_or_error(connections)
542        }
543    }
544
545    fn get_connection_by_addr<'a>(
546        &self,
547        connections: &'a mut HashMap<ArcStr, C>,
548        addr: &ArcStr,
549    ) -> RedisResult<&'a mut C> {
550        match connections.entry(addr.clone()) {
551            std::collections::hash_map::Entry::Occupied(occupied_entry) => {
552                Ok(occupied_entry.into_mut())
553            }
554            std::collections::hash_map::Entry::Vacant(vacant_entry) => {
555                // Create new connection.
556                // TODO: error handling
557                let conn = self.connect(addr)?;
558                Ok(vacant_entry.insert(conn))
559            }
560        }
561    }
562
563    fn get_addr_for_cmd(&self, cmd: &Cmd) -> RedisResult<ArcStr> {
564        let slots = self.slots.borrow();
565
566        let addr_for_slot = |route: Route| -> RedisResult<ArcStr> {
567            let slot_addr = slots
568                .slot_addr_for_route(&route)
569                .ok_or((ErrorKind::Client, "Missing slot coverage"))?;
570            Ok(slot_addr.clone())
571        };
572
573        match RoutingInfo::for_routable(cmd) {
574            Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) => {
575                let mut rng = rng();
576                Ok(addr_for_slot(Route::new(
577                    rng.random_range(0..SLOT_SIZE),
578                    SlotAddr::Master,
579                ))?)
580            }
581            Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(route))) => {
582                Ok(addr_for_slot(route)?)
583            }
584            _ => fail!(UNROUTABLE_ERROR),
585        }
586    }
587
588    fn map_cmds_to_nodes(&self, cmds: &[Cmd]) -> RedisResult<Vec<NodeCmd>> {
589        let mut cmd_map: HashMap<ArcStr, NodeCmd> = HashMap::new();
590
591        for (idx, cmd) in cmds.iter().enumerate() {
592            let addr = self.get_addr_for_cmd(cmd)?;
593            let nc = cmd_map
594                .entry(addr.clone())
595                .or_insert_with(|| NodeCmd::new(addr));
596            nc.indexes.push(idx);
597            cmd.write_packed_command(&mut nc.pipe);
598        }
599
600        let mut result = Vec::new();
601        for (_, v) in cmd_map.drain() {
602            result.push(v);
603        }
604        Ok(result)
605    }
606
607    fn execute_on_all<'a>(
608        &'a self,
609        input: Input,
610        addresses: HashSet<&'a ArcStr>,
611    ) -> Vec<RedisResult<(&'a ArcStr, Value)>> {
612        addresses
613            .into_iter()
614            .map(|addr| {
615                let (host, port) = split_node_address(addr).unwrap();
616                self.request(
617                    input.clone(),
618                    Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress {
619                        host: host.to_string(),
620                        port,
621                    })),
622                )
623                .map(|res| match res {
624                    Output::Single(value) => (addr, value),
625                    // technically this shouldn't be possible, but I prefer not to crash here.
626                    Output::Multi(values) => (addr, Value::Array(values)),
627                })
628            })
629            .collect()
630    }
631
632    fn execute_on_all_nodes<'a>(
633        &'a self,
634        input: Input,
635        slots: &'a mut SlotMap,
636    ) -> Vec<RedisResult<(&'a ArcStr, Value)>> {
637        self.execute_on_all(input, slots.addresses_for_all_nodes())
638    }
639
640    fn execute_on_all_primaries<'a>(
641        &'a self,
642        input: Input,
643        slots: &'a mut SlotMap,
644    ) -> Vec<RedisResult<(&'a ArcStr, Value)>> {
645        self.execute_on_all(input, slots.addresses_for_all_primaries())
646    }
647
648    fn execute_multi_slot<'a, 'b>(
649        &'a self,
650        input: Input,
651        slots: &'a mut SlotMap,
652        connections: &'a mut HashMap<ArcStr, C>,
653        routes: &'b [(Route, Vec<usize>)],
654    ) -> Vec<RedisResult<(&'a ArcStr, Value)>>
655    where
656        'b: 'a,
657    {
658        slots
659            .addresses_for_multi_slot(routes)
660            .enumerate()
661            .map(|(index, addr)| {
662                let addr = addr.ok_or(RedisError::from((
663                    ErrorKind::Io,
664                    "Couldn't find connection",
665                )))?;
666                let connection = self.get_connection_by_addr(connections, addr)?;
667                let (_, indices) = routes.get(index).unwrap();
668                let cmd =
669                    crate::cluster_routing::command_for_multi_slot_indices(&input, indices.iter());
670                connection.req_command(&cmd).map(|res| (addr, res))
671            })
672            .collect()
673    }
674
675    fn execute_on_multiple_nodes(
676        &self,
677        input: Input,
678        routing: MultipleNodeRoutingInfo,
679        response_policy: Option<ResponsePolicy>,
680    ) -> RedisResult<Value> {
681        let mut connections = self.connections.borrow_mut();
682        let mut slots = self.slots.borrow_mut();
683
684        let results = match &routing {
685            MultipleNodeRoutingInfo::MultiSlot((routes, _)) => {
686                self.execute_multi_slot(input, &mut slots, &mut connections, routes)
687            }
688            MultipleNodeRoutingInfo::AllMasters => {
689                drop(connections);
690                self.execute_on_all_primaries(input, &mut slots)
691            }
692            MultipleNodeRoutingInfo::AllNodes => {
693                drop(connections);
694                self.execute_on_all_nodes(input, &mut slots)
695            }
696        };
697
698        match response_policy {
699            Some(ResponsePolicy::AllSucceeded) => {
700                let mut last_result = None;
701                for result in results {
702                    last_result = Some(result?);
703                }
704
705                last_result
706                    .ok_or(
707                        (
708                            ErrorKind::ClusterConnectionNotFound,
709                            "No results received for multi-node operation",
710                        )
711                            .into(),
712                    )
713                    .map(|(_, res)| res)
714            }
715            Some(ResponsePolicy::OneSucceeded) => {
716                let mut last_failure = None;
717
718                for result in results {
719                    match result {
720                        Ok((_, val)) => return Ok(val),
721                        Err(err) => last_failure = Some(err),
722                    }
723                }
724
725                Err(last_failure
726                    .unwrap_or_else(|| (ErrorKind::Io, "Couldn't find a connection").into()))
727            }
728            Some(ResponsePolicy::CombineMaps) => crate::cluster_routing::combine_map_results(
729                results
730                    .into_iter()
731                    .map(|result| result.map(|(_, value)| value))
732                    .collect::<RedisResult<Vec<_>>>()?,
733            ),
734            Some(ResponsePolicy::FirstSucceededNonEmptyOrAllEmpty) => {
735                // Attempt to return the first result that isn't `Nil` or an error.
736                // If no such response is found and all servers returned `Nil`, it indicates that all shards are empty, so return `Nil`.
737                // If we received only errors, return the last received error.
738                // If we received a mix of errors and `Nil`s, we can't determine if all shards are empty,
739                // thus we return the last received error instead of `Nil`.
740                let mut last_failure = None;
741                let num_of_results = results.len();
742                let mut nil_counter = 0;
743                for result in results {
744                    match result.map(|(_, res)| res) {
745                        Ok(Value::Nil) => nil_counter += 1,
746                        Ok(val) => return Ok(val),
747                        Err(err) => last_failure = Some(err),
748                    }
749                }
750                if nil_counter == num_of_results {
751                    Ok(Value::Nil)
752                } else {
753                    Err(last_failure
754                        .unwrap_or_else(|| (ErrorKind::Io, "Couldn't find a connection").into()))
755                }
756            }
757            Some(ResponsePolicy::Aggregate(op)) => {
758                let results = results
759                    .into_iter()
760                    .map(|res| res.map(|(_, val)| val))
761                    .collect::<RedisResult<Vec<_>>>()?;
762                crate::cluster_routing::aggregate(results, op)
763            }
764            Some(ResponsePolicy::AggregateLogical(op)) => {
765                let results = results
766                    .into_iter()
767                    .map(|res| res.map(|(_, val)| val))
768                    .collect::<RedisResult<Vec<_>>>()?;
769                crate::cluster_routing::logical_aggregate(results, op)
770            }
771            Some(ResponsePolicy::CombineArrays) => {
772                let results = results
773                    .into_iter()
774                    .map(|res| res.map(|(_, val)| val))
775                    .collect::<RedisResult<Vec<_>>>()?;
776                match routing {
777                    MultipleNodeRoutingInfo::MultiSlot((vec, pattern)) => {
778                        crate::cluster_routing::combine_and_sort_array_results(
779                            results, &vec, &pattern,
780                        )
781                    }
782                    _ => crate::cluster_routing::combine_array_results(results),
783                }
784            }
785            Some(ResponsePolicy::Special) | None => {
786                // This is our assumption - if there's no coherent way to aggregate the responses, we just map each response to the sender, and pass it to the user.
787                // TODO - once Value::Error is merged, we can use join_all and report separate errors and also pass successes.
788                let results = results
789                    .into_iter()
790                    .map(|result| {
791                        result.map(|(addr, val)| (Value::BulkString(addr.as_bytes().to_vec()), val))
792                    })
793                    .collect::<RedisResult<Vec<_>>>()?;
794                Ok(Value::Map(results))
795            }
796        }
797    }
798
799    #[allow(clippy::unnecessary_unwrap)]
800    fn request(&self, input: Input, route_option: Option<RoutingInfo>) -> RedisResult<Output> {
801        let single_node_routing = match route_option {
802            Some(RoutingInfo::SingleNode(single_node_routing)) => single_node_routing,
803            Some(RoutingInfo::MultiNode((multi_node_routing, response_policy))) => {
804                return self
805                    .execute_on_multiple_nodes(input, multi_node_routing, response_policy)
806                    .map(Output::Single);
807            }
808            None => fail!(UNROUTABLE_ERROR),
809        };
810
811        let mut retries = 0;
812        let mut redirected = None::<Redirect>;
813
814        loop {
815            // Get target address and response.
816            let (addr, rv) = {
817                let mut connections = self.connections.borrow_mut();
818                let (addr, conn) = if let Some(redirected) = redirected.take() {
819                    let (addr, is_asking) = match redirected {
820                        Redirect::Moved(addr) => (addr, false),
821                        Redirect::Ask(addr) => (addr, true),
822                    };
823                    let mut conn = self.get_connection_by_addr(&mut connections, &addr);
824                    if is_asking {
825                        // if we are in asking mode we want to feed a single
826                        // ASKING command into the connection before what we
827                        // actually want to execute.
828                        conn = conn.and_then(|conn| {
829                            conn.req_packed_command(&b"*1\r\n$6\r\nASKING\r\n"[..])
830                                .and_then(|value| value.extract_error())?;
831                            Ok(conn)
832                        });
833                    }
834                    (addr, conn)
835                } else {
836                    match &single_node_routing {
837                        SingleNodeRoutingInfo::Random => {
838                            get_random_connection_or_error(&mut connections)
839                        }
840                        SingleNodeRoutingInfo::SpecificNode(route) => {
841                            self.get_connection(&mut connections, route)
842                        }
843                        SingleNodeRoutingInfo::ByAddress { host, port } => {
844                            let address = format!("{host}:{port}").into();
845                            let conn = self.get_connection_by_addr(&mut connections, &address);
846                            (address, conn)
847                        }
848                        SingleNodeRoutingInfo::RandomPrimary => {
849                            self.get_connection(&mut connections, &Route::new_random_primary())
850                        }
851                    }
852                };
853                (addr, conn.and_then(|conn| input.send(conn)))
854            };
855
856            match rv {
857                Ok(rv) => return Ok(rv),
858                Err(err) => {
859                    if err.kind() == ErrorKind::ClusterConnectionNotFound
860                        && *self.auto_reconnect.borrow()
861                    {
862                        for node in &self.initial_nodes {
863                            let addr = node.addr.to_string().into();
864                            if let Ok(mut conn) = self.connect(&addr) {
865                                if conn.check_connection() {
866                                    self.connections.borrow_mut().insert(addr, conn);
867                                }
868                            }
869                        }
870                        self.refresh_slots()?;
871                    }
872
873                    if retries == self.cluster_params.retry_params.number_of_retries {
874                        return Err(err);
875                    }
876                    retries += 1;
877
878                    match err.retry_method() {
879                        RetryMethod::AskRedirect => {
880                            redirected = err
881                                .redirect_node()
882                                .map(|(node, _slot)| Redirect::Ask(node.into()));
883                        }
884                        RetryMethod::MovedRedirect => {
885                            // Refresh slots.
886                            self.refresh_slots()?;
887                            // Request again.
888                            redirected = err
889                                .redirect_node()
890                                .map(|(node, _slot)| Redirect::Moved(node.into()));
891                        }
892                        RetryMethod::WaitAndRetry => {
893                            // Sleep and retry.
894                            let sleep_time = self
895                                .cluster_params
896                                .retry_params
897                                .wait_time_for_retry(retries);
898                            thread::sleep(sleep_time);
899                        }
900                        RetryMethod::Reconnect => {
901                            if *self.auto_reconnect.borrow() {
902                                // if the connection is no longer valid, we should remove it.
903                                self.connections.borrow_mut().remove(&addr);
904                                if let Ok(mut conn) = self.connect(&addr) {
905                                    if conn.check_connection() {
906                                        self.connections.borrow_mut().insert(addr, conn);
907                                    }
908                                }
909                            }
910                        }
911                        RetryMethod::NoRetry => {
912                            return Err(err);
913                        }
914                        RetryMethod::RetryImmediately => {}
915                        RetryMethod::ReconnectFromInitialConnections => {
916                            // TODO - implement reconnect from initial connections
917                            if *self.auto_reconnect.borrow() {
918                                if let Ok(mut conn) = self.connect(&addr) {
919                                    if conn.check_connection() {
920                                        self.connections.borrow_mut().insert(addr, conn);
921                                    }
922                                }
923                            }
924                        }
925                    }
926                }
927            }
928        }
929    }
930
931    fn send_recv_and_retry_cmds(&self, cmds: &[Cmd]) -> RedisResult<Vec<Value>> {
932        // Vector to hold the results, pre-populated with `Nil` values. This allows the original
933        // cmd ordering to be re-established by inserting the response directly into the result
934        // vector (e.g., results[10] = response).
935        let mut results = vec![Value::Nil; cmds.len()];
936
937        let to_retry = self
938            .send_all_commands(cmds)
939            .and_then(|node_cmds| self.recv_all_commands(&mut results, &node_cmds))?;
940
941        if to_retry.is_empty() {
942            return Ok(results);
943        }
944
945        // Refresh the slots to ensure that we have a clean slate for the retry attempts.
946        self.refresh_slots()?;
947
948        // Given that there are commands that need to be retried, it means something in the cluster
949        // topology changed. Execute each command separately to take advantage of the existing
950        // retry logic that handles these cases.
951        for retry_idx in to_retry {
952            let cmd = &cmds[retry_idx];
953            let routing = RoutingInfo::for_routable(cmd);
954            results[retry_idx] = self.request(Input::Cmd(cmd), routing)?.into();
955        }
956        Ok(results)
957    }
958
959    // Build up a pipeline per node, then send it
960    fn send_all_commands(&self, cmds: &[Cmd]) -> RedisResult<Vec<NodeCmd>> {
961        let mut connections = self.connections.borrow_mut();
962
963        let node_cmds = self.map_cmds_to_nodes(cmds)?;
964        for nc in &node_cmds {
965            self.get_connection_by_addr(&mut connections, &nc.addr)?
966                .send_packed_command(&nc.pipe)?;
967        }
968        Ok(node_cmds)
969    }
970
971    // Receive from each node, keeping track of which commands need to be retried.
972    fn recv_all_commands(
973        &self,
974        results: &mut [Value],
975        node_cmds: &[NodeCmd],
976    ) -> RedisResult<Vec<usize>> {
977        let mut to_retry = Vec::new();
978        let mut connections = self.connections.borrow_mut();
979        let mut first_err = None;
980
981        for nc in node_cmds {
982            for cmd_idx in &nc.indexes {
983                match self
984                    .get_connection_by_addr(&mut connections, &nc.addr)?
985                    .recv_response()
986                {
987                    Ok(item) => results[*cmd_idx] = item,
988                    Err(err) if err.is_cluster_error() => to_retry.push(*cmd_idx),
989                    Err(err) => first_err = first_err.or(Some(err)),
990                }
991            }
992        }
993        match first_err {
994            Some(err) => Err(err),
995            None => Ok(to_retry),
996        }
997    }
998
999    /// Send a command to the given `routing`.
1000    pub fn route_command(&mut self, cmd: &Cmd, routing: RoutingInfo) -> RedisResult<Value> {
1001        self.request(Input::Cmd(cmd), Some(routing))
1002            .map(|res| res.into())
1003    }
1004}
1005
1006const MULTI: &[u8] = "*1\r\n$5\r\nMULTI\r\n".as_bytes();
1007impl<C: Connect + ConnectionLike> ConnectionLike for ClusterConnection<C> {
1008    fn supports_pipelining(&self) -> bool {
1009        false
1010    }
1011
1012    fn req_command(&mut self, cmd: &Cmd) -> RedisResult<Value> {
1013        if cmd.is_empty() {
1014            return Err(RedisError::make_empty_command());
1015        }
1016        let routing = RoutingInfo::for_routable(cmd);
1017        self.request(Input::Cmd(cmd), routing).map(|res| res.into())
1018    }
1019
1020    fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
1021        if cmd.is_empty() {
1022            return Err(RedisError::make_empty_command());
1023        }
1024        let actual_cmd = if cmd.starts_with(MULTI) {
1025            &cmd[MULTI.len()..]
1026        } else {
1027            cmd
1028        };
1029        let value = parse_redis_value(actual_cmd)?;
1030        let routing = RoutingInfo::for_routable(&value);
1031        self.request(
1032            Input::Slice {
1033                cmd,
1034                routable: value,
1035            },
1036            routing,
1037        )
1038        .map(|res| res.into())
1039    }
1040
1041    fn req_packed_commands(
1042        &mut self,
1043        cmd: &[u8],
1044        offset: usize,
1045        count: usize,
1046    ) -> RedisResult<Vec<Value>> {
1047        if cmd.is_empty() {
1048            return Err(RedisError::make_empty_command());
1049        }
1050        let actual_cmd = if cmd.starts_with(MULTI) {
1051            &cmd[MULTI.len()..]
1052        } else {
1053            cmd
1054        };
1055        let value = parse_redis_value(actual_cmd)?;
1056        let route = match RoutingInfo::for_routable(&value) {
1057            // we don't allow routing multiple commands to multiple nodes.
1058            Some(RoutingInfo::MultiNode(_)) => None,
1059            Some(RoutingInfo::SingleNode(route)) => Some(route),
1060            None => None,
1061        }
1062        .unwrap_or(SingleNodeRoutingInfo::Random);
1063        self.request(
1064            Input::Commands { cmd, offset, count },
1065            Some(RoutingInfo::SingleNode(route)),
1066        )
1067        .map(|res| res.into())
1068    }
1069
1070    fn get_db(&self) -> i64 {
1071        0
1072    }
1073
1074    fn is_open(&self) -> bool {
1075        let connections = self.connections.borrow();
1076        for conn in connections.values() {
1077            if !conn.is_open() {
1078                return false;
1079            }
1080        }
1081        true
1082    }
1083
1084    fn check_connection(&mut self) -> bool {
1085        let mut connections = self.connections.borrow_mut();
1086        for conn in connections.values_mut() {
1087            if !conn.check_connection() {
1088                return false;
1089            }
1090        }
1091        true
1092    }
1093}
1094
1095#[derive(Debug)]
1096struct NodeCmd {
1097    // The original command indexes
1098    indexes: Vec<usize>,
1099    pipe: Vec<u8>,
1100    addr: ArcStr,
1101}
1102
1103impl NodeCmd {
1104    fn new(a: ArcStr) -> NodeCmd {
1105        NodeCmd {
1106            indexes: vec![],
1107            pipe: vec![],
1108            addr: a,
1109        }
1110    }
1111}
1112
1113fn get_random_connection<C: ConnectionLike + Connect + Sized>(
1114    connections: &mut HashMap<ArcStr, C>,
1115) -> Option<(ArcStr, &mut C)> {
1116    connections
1117        .iter_mut()
1118        .choose(&mut rng())
1119        .map(|(addr, conn)| (addr.clone(), conn))
1120}
1121
1122fn get_random_connection_or_error<C: ConnectionLike + Connect + Sized>(
1123    connections: &mut HashMap<ArcStr, C>,
1124) -> (ArcStr, RedisResult<&mut C>) {
1125    match get_random_connection(connections) {
1126        Some((addr, conn)) => (addr, Ok(conn)),
1127        None => (
1128            // we need to add a fake address in order for the error to be handled - the code that uses it assumes there's an address attached.
1129            String::new().into(),
1130            Err(RedisError::from((
1131                ErrorKind::ClusterConnectionNotFound,
1132                "No connections found",
1133            ))),
1134        ),
1135    }
1136}
1137
1138#[cfg(test)]
1139mod tests {
1140    use crate::ConnectionAddr;
1141
1142    use super::*;
1143
1144    #[test]
1145    fn parse_cluster_node_host_port() {
1146        let cases = vec![
1147            (
1148                "127.0.0.1:6379",
1149                ConnectionAddr::Tcp("127.0.0.1".to_string(), 6379u16),
1150            ),
1151            (
1152                "localhost.localdomain:6379",
1153                ConnectionAddr::Tcp("localhost.localdomain".to_string(), 6379u16),
1154            ),
1155            (
1156                "dead::cafe:beef:30001",
1157                ConnectionAddr::Tcp("dead::cafe:beef".to_string(), 30001u16),
1158            ),
1159            (
1160                "[fe80::cafe:beef%en1]:30001",
1161                ConnectionAddr::Tcp("fe80::cafe:beef%en1".to_string(), 30001u16),
1162            ),
1163        ];
1164
1165        for (input, expected) in cases {
1166            let res = get_connection_info(input, &ClusterParams::default());
1167            assert_eq!(res.unwrap().addr, expected);
1168        }
1169
1170        let cases = vec![":0", "[]:6379"];
1171        for input in cases {
1172            let res = get_connection_info(input, &ClusterParams::default());
1173            assert_eq!(
1174                res.err(),
1175                Some(RedisError::from((
1176                    ErrorKind::InvalidClientConfig,
1177                    "Invalid node string",
1178                ))),
1179            );
1180        }
1181    }
1182}