redis/
cluster_routing.rs

1use std::cmp::min;
2use std::collections::{BTreeMap, HashMap, HashSet};
3
4use rand::prelude::IndexedRandom;
5use rand::rng;
6
7use crate::cmd::{Arg, Cmd};
8use crate::commands::is_readonly_cmd;
9use crate::types::Value;
10use crate::{ErrorKind, RedisResult};
11
12pub(crate) const SLOT_SIZE: u16 = 16384;
13
14fn slot(key: &[u8]) -> u16 {
15    crc16::State::<crc16::XMODEM>::calculate(key) % SLOT_SIZE
16}
17
18#[derive(Clone, PartialEq, Debug)]
19pub(crate) enum Redirect {
20    Moved(String),
21    Ask(String),
22}
23
24/// Logical bitwise aggregating operators.
25#[derive(Debug, Clone, Copy, PartialEq)]
26pub enum LogicalAggregateOp {
27    /// Aggregate by bitwise &&
28    And,
29    // Or, omitted due to dead code warnings. ATM this value isn't constructed anywhere
30}
31
32/// Numerical aggreagting operators.
33#[derive(Debug, Clone, Copy, PartialEq)]
34pub enum AggregateOp {
35    /// Choose minimal value
36    Min,
37    /// Sum all values
38    Sum,
39    // Max, omitted due to dead code warnings. ATM this value isn't constructed anywhere
40}
41
42/// Policy defining how to combine multiple responses into one.
43#[derive(Debug, Clone, Copy, PartialEq)]
44pub enum ResponsePolicy {
45    /// Wait for one request to succeed and return its results. Return error if all requests fail.
46    OneSucceeded,
47    /// Wait for one request to succeed with a non-empty value. Return error if all requests fail or return `Nil`.
48    OneSucceededNonEmpty,
49    /// Waits for all requests to succeed, and the returns one of the successes. Returns the error on the first received error.
50    AllSucceeded,
51    /// Aggregate success results according to a logical bitwise operator. Return error on any failed request or on a response that doesn't conform to 0 or 1.
52    AggregateLogical(LogicalAggregateOp),
53    /// Aggregate success results according to a numeric operator. Return error on any failed request or on a response that isn't an integer.
54    Aggregate(AggregateOp),
55    /// Aggregate array responses into a single array. Return error on any failed request or on a response that isn't an array.
56    CombineArrays,
57    /// Handling is not defined by the Redis standard. Will receive a special case
58    Special,
59}
60
61/// Defines whether a request should be routed to a single node, or multiple ones.
62#[derive(Debug, Clone, PartialEq)]
63pub enum RoutingInfo {
64    /// Route to single node
65    SingleNode(SingleNodeRoutingInfo),
66    /// Route to multiple nodes
67    MultiNode((MultipleNodeRoutingInfo, Option<ResponsePolicy>)),
68}
69
70/// Defines which single node should receive a request.
71#[derive(Debug, Clone, PartialEq)]
72pub enum SingleNodeRoutingInfo {
73    /// Route to any node at random
74    Random,
75    /// Route to the node that matches the [Route]
76    SpecificNode(Route),
77    /// Route to the node with the given address.
78    ByAddress {
79        /// DNS hostname of the node
80        host: String,
81        /// port of the node
82        port: u16,
83    },
84}
85
86impl From<Option<Route>> for SingleNodeRoutingInfo {
87    fn from(value: Option<Route>) -> Self {
88        value
89            .map(SingleNodeRoutingInfo::SpecificNode)
90            .unwrap_or(SingleNodeRoutingInfo::Random)
91    }
92}
93
94/// Defines which collection of nodes should receive a request
95#[derive(Debug, Clone, PartialEq)]
96pub enum MultipleNodeRoutingInfo {
97    /// Route to all nodes in the clusters
98    AllNodes,
99    /// Route to all primaries in the cluster
100    AllMasters,
101    /// Instructions for how to split a multi-slot command (e.g. MGET, MSET) into sub-commands. Each tuple is the route for each subcommand, and the indices of the arguments from the original command that should be copied to the subcommand.
102    MultiSlot(Vec<(Route, Vec<usize>)>),
103}
104
105/// Splits a command into a sub-command that won't generate a CROSSLOTS error.
106///
107///  Takes a routable and an iterator of indices, which is assumed to be created from`MultipleNodeRoutingInfo::MultiSlot`,
108/// and returns a command with the arguments matching the indices.
109pub fn command_for_multi_slot_indices<'a, 'b>(
110    original_cmd: &'a impl Routable,
111    indices: impl Iterator<Item = &'b usize> + 'a,
112) -> Cmd
113where
114    'b: 'a,
115{
116    let mut new_cmd = Cmd::new();
117    let command_length = 1; // TODO - the +1 should change if we have multi-slot commands with 2 command words.
118    new_cmd.arg(original_cmd.arg_idx(0));
119    for index in indices {
120        new_cmd.arg(original_cmd.arg_idx(index + command_length));
121    }
122    new_cmd
123}
124
125pub(crate) fn aggregate(values: Vec<Value>, op: AggregateOp) -> RedisResult<Value> {
126    let initial_value = match op {
127        AggregateOp::Min => i64::MAX,
128        AggregateOp::Sum => 0,
129    };
130    let result = values.into_iter().try_fold(initial_value, |acc, curr| {
131        let int = match curr {
132            Value::Int(int) => int,
133            _ => {
134                return RedisResult::Err(
135                    (
136                        ErrorKind::TypeError,
137                        "expected array of integers as response",
138                    )
139                        .into(),
140                );
141            }
142        };
143        let acc = match op {
144            AggregateOp::Min => min(acc, int),
145            AggregateOp::Sum => acc + int,
146        };
147        Ok(acc)
148    })?;
149    Ok(Value::Int(result))
150}
151
152pub(crate) fn logical_aggregate(values: Vec<Value>, op: LogicalAggregateOp) -> RedisResult<Value> {
153    let initial_value = match op {
154        LogicalAggregateOp::And => true,
155    };
156    let results = values.into_iter().try_fold(Vec::new(), |acc, curr| {
157        let values = match curr {
158            Value::Array(values) => values,
159            _ => {
160                return RedisResult::Err(
161                    (
162                        ErrorKind::TypeError,
163                        "expected array of integers as response",
164                    )
165                        .into(),
166                );
167            }
168        };
169        let mut acc = if acc.is_empty() {
170            vec![initial_value; values.len()]
171        } else {
172            acc
173        };
174        for (index, value) in values.into_iter().enumerate() {
175            let int = match value {
176                Value::Int(int) => int,
177                _ => {
178                    return Err((
179                        ErrorKind::TypeError,
180                        "expected array of integers as response",
181                    )
182                        .into());
183                }
184            };
185            acc[index] = match op {
186                LogicalAggregateOp::And => acc[index] && (int > 0),
187            };
188        }
189        Ok(acc)
190    })?;
191    Ok(Value::Array(
192        results
193            .into_iter()
194            .map(|result| Value::Int(result as i64))
195            .collect(),
196    ))
197}
198
199pub(crate) fn combine_array_results(values: Vec<Value>) -> RedisResult<Value> {
200    let mut results = Vec::new();
201
202    for value in values {
203        match value {
204            Value::Array(values) => results.extend(values),
205            _ => {
206                return Err((ErrorKind::TypeError, "expected array of values as response").into());
207            }
208        }
209    }
210
211    Ok(Value::Array(results))
212}
213
214/// Combines multiple call results in the `values` field, each assume to be an array of results,
215/// into a single array. `sorting_order` defines the order of the results in the returned array -
216/// for each array of results, `sorting_order` should contain a matching array with the indices of
217/// the results in the final array.
218pub(crate) fn combine_and_sort_array_results<'a>(
219    values: Vec<Value>,
220    sorting_order: impl ExactSizeIterator<Item = &'a Vec<usize>>,
221) -> RedisResult<Value> {
222    let mut results = Vec::new();
223    results.resize(
224        values.iter().fold(0, |acc, value| match value {
225            Value::Array(values) => values.len() + acc,
226            _ => 0,
227        }),
228        Value::Nil,
229    );
230    assert_eq!(values.len(), sorting_order.len());
231
232    for (key_indices, value) in sorting_order.into_iter().zip(values) {
233        match value {
234            Value::Array(values) => {
235                assert_eq!(values.len(), key_indices.len());
236                for (index, value) in key_indices.iter().zip(values) {
237                    results[*index] = value;
238                }
239            }
240            _ => {
241                return Err((ErrorKind::TypeError, "expected array of values as response").into());
242            }
243        }
244    }
245
246    Ok(Value::Array(results))
247}
248
249/// Returns the slot that matches `key`.
250pub fn get_slot(key: &[u8]) -> u16 {
251    let key = match get_hashtag(key) {
252        Some(tag) => tag,
253        None => key,
254    };
255
256    slot(key)
257}
258
259fn get_route(is_readonly: bool, key: &[u8]) -> Route {
260    let slot = get_slot(key);
261    if is_readonly {
262        Route::new(slot, SlotAddr::ReplicaOptional)
263    } else {
264        Route::new(slot, SlotAddr::Master)
265    }
266}
267
268/// Takes the given `routable` and creates a multi-slot routing info.
269/// This is used for commands like MSET & MGET, where if the command's keys
270/// are hashed to multiple slots, the command should be split into sub-commands,
271/// each targeting a single slot. The results of these sub-commands are then
272/// usually reassembled using `combine_and_sort_array_results`. In order to do this,
273/// `MultipleNodeRoutingInfo::MultiSlot` contains the routes for each sub-command, and
274/// the indices in the final combined result for each result from the sub-command.
275///
276/// If all keys are routed to the same slot, there's no need to split the command,
277/// so a single node routing info will be returned.
278fn multi_shard<R>(
279    routable: &R,
280    cmd: &[u8],
281    first_key_index: usize,
282    has_values: bool,
283) -> Option<RoutingInfo>
284where
285    R: Routable + ?Sized,
286{
287    let is_readonly = is_readonly_cmd(cmd);
288    let mut routes = HashMap::new();
289    let mut key_index = 0;
290    while let Some(key) = routable.arg_idx(first_key_index + key_index) {
291        let route = get_route(is_readonly, key);
292        let entry = routes.entry(route);
293        let keys = entry.or_insert(Vec::new());
294        keys.push(key_index);
295
296        if has_values {
297            key_index += 1;
298            routable.arg_idx(first_key_index + key_index)?; // check that there's a value for the key
299            keys.push(key_index);
300        }
301        key_index += 1;
302    }
303
304    let mut routes: Vec<(Route, Vec<usize>)> = routes.into_iter().collect();
305    Some(if routes.len() == 1 {
306        RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(routes.pop().unwrap().0))
307    } else {
308        RoutingInfo::MultiNode((
309            MultipleNodeRoutingInfo::MultiSlot(routes),
310            ResponsePolicy::for_command(cmd),
311        ))
312    })
313}
314
315impl ResponsePolicy {
316    /// Parse the command for the matching response policy.
317    pub fn for_command(cmd: &[u8]) -> Option<ResponsePolicy> {
318        match cmd {
319            b"SCRIPT EXISTS" => Some(ResponsePolicy::AggregateLogical(LogicalAggregateOp::And)),
320
321            b"DBSIZE" | b"DEL" | b"EXISTS" | b"SLOWLOG LEN" | b"TOUCH" | b"UNLINK"
322            | b"LATENCY RESET" => Some(ResponsePolicy::Aggregate(AggregateOp::Sum)),
323
324            b"WAIT" => Some(ResponsePolicy::Aggregate(AggregateOp::Min)),
325
326            b"ACL SETUSER" | b"ACL DELUSER" | b"ACL SAVE" | b"CLIENT SETNAME"
327            | b"CLIENT SETINFO" | b"CONFIG SET" | b"CONFIG RESETSTAT" | b"CONFIG REWRITE"
328            | b"FLUSHALL" | b"FLUSHDB" | b"FUNCTION DELETE" | b"FUNCTION FLUSH"
329            | b"FUNCTION LOAD" | b"FUNCTION RESTORE" | b"MEMORY PURGE" | b"MSET" | b"PING"
330            | b"SCRIPT FLUSH" | b"SCRIPT LOAD" | b"SLOWLOG RESET" => {
331                Some(ResponsePolicy::AllSucceeded)
332            }
333
334            b"KEYS" | b"MGET" | b"SLOWLOG GET" => Some(ResponsePolicy::CombineArrays),
335
336            b"FUNCTION KILL" | b"SCRIPT KILL" => Some(ResponsePolicy::OneSucceeded),
337
338            // This isn't based on response_tips, but on the discussion here - https://github.com/redis/redis/issues/12410
339            b"RANDOMKEY" => Some(ResponsePolicy::OneSucceededNonEmpty),
340
341            b"LATENCY GRAPH" | b"LATENCY HISTOGRAM" | b"LATENCY HISTORY" | b"LATENCY DOCTOR"
342            | b"LATENCY LATEST" => Some(ResponsePolicy::Special),
343
344            b"FUNCTION STATS" => Some(ResponsePolicy::Special),
345
346            b"MEMORY MALLOC-STATS" | b"MEMORY DOCTOR" | b"MEMORY STATS" => {
347                Some(ResponsePolicy::Special)
348            }
349
350            b"INFO" => Some(ResponsePolicy::Special),
351
352            _ => None,
353        }
354    }
355}
356
357impl RoutingInfo {
358    /// Returns the routing info for `r`.
359    pub fn for_routable<R>(r: &R) -> Option<RoutingInfo>
360    where
361        R: Routable + ?Sized,
362    {
363        let cmd = &r.command()?[..];
364        match cmd {
365            b"RANDOMKEY"
366            | b"KEYS"
367            | b"SCRIPT EXISTS"
368            | b"WAIT"
369            | b"DBSIZE"
370            | b"FLUSHALL"
371            | b"FUNCTION RESTORE"
372            | b"FUNCTION DELETE"
373            | b"FUNCTION FLUSH"
374            | b"FUNCTION LOAD"
375            | b"PING"
376            | b"FLUSHDB"
377            | b"MEMORY PURGE"
378            | b"FUNCTION KILL"
379            | b"SCRIPT KILL"
380            | b"FUNCTION STATS"
381            | b"MEMORY MALLOC-STATS"
382            | b"MEMORY DOCTOR"
383            | b"MEMORY STATS"
384            | b"INFO" => Some(RoutingInfo::MultiNode((
385                MultipleNodeRoutingInfo::AllMasters,
386                ResponsePolicy::for_command(cmd),
387            ))),
388
389            b"ACL SETUSER" | b"ACL DELUSER" | b"ACL SAVE" | b"CLIENT SETNAME"
390            | b"CLIENT SETINFO" | b"SLOWLOG GET" | b"SLOWLOG LEN" | b"SLOWLOG RESET"
391            | b"CONFIG SET" | b"CONFIG RESETSTAT" | b"CONFIG REWRITE" | b"SCRIPT FLUSH"
392            | b"SCRIPT LOAD" | b"LATENCY RESET" | b"LATENCY GRAPH" | b"LATENCY HISTOGRAM"
393            | b"LATENCY HISTORY" | b"LATENCY DOCTOR" | b"LATENCY LATEST" => {
394                Some(RoutingInfo::MultiNode((
395                    MultipleNodeRoutingInfo::AllNodes,
396                    ResponsePolicy::for_command(cmd),
397                )))
398            }
399
400            // keyless commands with more arguments, whose arguments might be wrongly taken to be keys.
401            // TODO - double check these, in order to find better ways to route some of them.
402            b"ACL DRYRUN"
403            | b"ACL GENPASS"
404            | b"ACL GETUSER"
405            | b"ACL HELP"
406            | b"ACL LIST"
407            | b"ACL LOG"
408            | b"ACL USERS"
409            | b"ACL WHOAMI"
410            | b"AUTH"
411            | b"TIME"
412            | b"PUBSUB CHANNELS"
413            | b"PUBSUB NUMPAT"
414            | b"PUBSUB SHARDCHANNELS"
415            | b"BGSAVE"
416            | b"WAITAOF"
417            | b"SAVE"
418            | b"LASTSAVE"
419            | b"CLIENT TRACKINGINFO"
420            | b"CLIENT PAUSE"
421            | b"CLIENT UNPAUSE"
422            | b"CLIENT UNBLOCK"
423            | b"CLIENT ID"
424            | b"CLIENT REPLY"
425            | b"CLIENT GETNAME"
426            | b"CLIENT GETREDIR"
427            | b"CLIENT INFO"
428            | b"CLIENT KILL"
429            | b"CLUSTER INFO"
430            | b"CLUSTER MEET"
431            | b"CLUSTER MYSHARDID"
432            | b"CLUSTER NODES"
433            | b"CLUSTER REPLICAS"
434            | b"CLUSTER RESET"
435            | b"CLUSTER SET-CONFIG-EPOCH"
436            | b"CLUSTER SLOTS"
437            | b"CLUSTER SHARDS"
438            | b"CLUSTER COUNT-FAILURE-REPORTS"
439            | b"CLUSTER KEYSLOT"
440            | b"COMMAND"
441            | b"COMMAND COUNT"
442            | b"COMMAND LIST"
443            | b"COMMAND GETKEYS"
444            | b"CONFIG GET"
445            | b"DEBUG"
446            | b"ECHO"
447            | b"READONLY"
448            | b"READWRITE"
449            | b"TFUNCTION LOAD"
450            | b"TFUNCTION DELETE"
451            | b"TFUNCTION LIST"
452            | b"TFCALL"
453            | b"TFCALLASYNC"
454            | b"MODULE LIST"
455            | b"MODULE LOAD"
456            | b"MODULE UNLOAD"
457            | b"MODULE LOADEX" => Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)),
458
459            b"CLUSTER COUNTKEYSINSLOT"
460            | b"CLUSTER GETKEYSINSLOT"
461            | b"CLUSTER SETSLOT"
462            | b"CLUSTER DELSLOTS"
463            | b"CLUSTER DELSLOTSRANGE" => r
464                .arg_idx(2)
465                .and_then(|arg| std::str::from_utf8(arg).ok())
466                .and_then(|slot| slot.parse::<u16>().ok())
467                .map(|slot| {
468                    RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route::new(
469                        slot,
470                        SlotAddr::Master,
471                    )))
472                }),
473
474            b"MGET" | b"DEL" | b"EXISTS" | b"UNLINK" | b"TOUCH" => multi_shard(r, cmd, 1, false),
475            b"MSET" => multi_shard(r, cmd, 1, true),
476            // TODO - special handling - b"SCAN"
477            b"SCAN" | b"SHUTDOWN" | b"SLAVEOF" | b"REPLICAOF" | b"MOVE" | b"BITOP" => None,
478            b"EVALSHA" | b"EVAL" => {
479                let key_count = r
480                    .arg_idx(2)
481                    .and_then(|x| std::str::from_utf8(x).ok())
482                    .and_then(|x| x.parse::<u64>().ok())?;
483                if key_count == 0 {
484                    Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
485                } else {
486                    r.arg_idx(3).map(|key| RoutingInfo::for_key(cmd, key))
487                }
488            }
489            b"XGROUP CREATE"
490            | b"XGROUP CREATECONSUMER"
491            | b"XGROUP DELCONSUMER"
492            | b"XGROUP DESTROY"
493            | b"XGROUP SETID"
494            | b"XINFO CONSUMERS"
495            | b"XINFO GROUPS"
496            | b"XINFO STREAM"
497            | b"PUBSUB SHARDNUMSUB"
498            | b"PUBSUB NUMSUB" => r.arg_idx(2).map(|key| RoutingInfo::for_key(cmd, key)),
499            b"XREAD" | b"XREADGROUP" => {
500                let streams_position = r.position(b"STREAMS")?;
501                r.arg_idx(streams_position + 1)
502                    .map(|key| RoutingInfo::for_key(cmd, key))
503            }
504            _ => match r.arg_idx(1) {
505                Some(key) => Some(RoutingInfo::for_key(cmd, key)),
506                None => Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)),
507            },
508        }
509    }
510
511    fn for_key(cmd: &[u8], key: &[u8]) -> RoutingInfo {
512        RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(get_route(
513            is_readonly_cmd(cmd),
514            key,
515        )))
516    }
517}
518
519/// Objects that implement this trait define a request that can be routed by a cluster client to different nodes in the cluster.
520pub trait Routable {
521    /// Convenience function to return ascii uppercase version of the
522    /// the first argument (i.e., the command).
523    fn command(&self) -> Option<Vec<u8>> {
524        let primary_command = self.arg_idx(0).map(|x| x.to_ascii_uppercase())?;
525        let mut primary_command = match primary_command.as_slice() {
526            b"XGROUP" | b"OBJECT" | b"SLOWLOG" | b"FUNCTION" | b"MODULE" | b"COMMAND"
527            | b"PUBSUB" | b"CONFIG" | b"MEMORY" | b"XINFO" | b"CLIENT" | b"ACL" | b"SCRIPT"
528            | b"CLUSTER" | b"LATENCY" => primary_command,
529            _ => {
530                return Some(primary_command);
531            }
532        };
533
534        Some(match self.arg_idx(1) {
535            Some(secondary_command) => {
536                let previous_len = primary_command.len();
537                primary_command.reserve(secondary_command.len() + 1);
538                primary_command.extend(b" ");
539                primary_command.extend(secondary_command);
540                let current_len = primary_command.len();
541                primary_command[previous_len + 1..current_len].make_ascii_uppercase();
542                primary_command
543            }
544            None => primary_command,
545        })
546    }
547
548    /// Returns a reference to the data for the argument at `idx`.
549    fn arg_idx(&self, idx: usize) -> Option<&[u8]>;
550
551    /// Returns index of argument that matches `candidate`, if it exists
552    fn position(&self, candidate: &[u8]) -> Option<usize>;
553}
554
555impl Routable for Cmd {
556    fn arg_idx(&self, idx: usize) -> Option<&[u8]> {
557        self.arg_idx(idx)
558    }
559
560    fn position(&self, candidate: &[u8]) -> Option<usize> {
561        self.args_iter().position(|a| match a {
562            Arg::Simple(d) => d.eq_ignore_ascii_case(candidate),
563            _ => false,
564        })
565    }
566}
567
568impl Routable for Value {
569    fn arg_idx(&self, idx: usize) -> Option<&[u8]> {
570        match self {
571            Value::Array(args) => match args.get(idx) {
572                Some(Value::BulkString(ref data)) => Some(&data[..]),
573                _ => None,
574            },
575            _ => None,
576        }
577    }
578
579    fn position(&self, candidate: &[u8]) -> Option<usize> {
580        match self {
581            Value::Array(args) => args.iter().position(|a| match a {
582                Value::BulkString(d) => d.eq_ignore_ascii_case(candidate),
583                _ => false,
584            }),
585            _ => None,
586        }
587    }
588}
589
590#[derive(Debug, PartialEq)]
591pub(crate) struct Slot {
592    pub(crate) start: u16,
593    pub(crate) end: u16,
594    pub(crate) master: String,
595    pub(crate) replicas: Vec<String>,
596}
597
598impl Slot {
599    pub fn new(s: u16, e: u16, m: String, r: Vec<String>) -> Self {
600        Self {
601            start: s,
602            end: e,
603            master: m,
604            replicas: r,
605        }
606    }
607}
608
609/// What type of node should a request be routed to.
610#[derive(Eq, PartialEq, Clone, Copy, Debug, Hash)]
611pub enum SlotAddr {
612    /// The request must be routed to primary node
613    Master,
614    /// The request may be routed to a replica node.
615    /// For example, a GET command can be routed either to replica or primary.
616    ReplicaOptional,
617    /// The request must be routed to replica node, if one exists.
618    /// For example, by user requested routing.
619    ReplicaRequired,
620}
621
622/// This is just a simplified version of [`Slot`],
623/// which stores only the master and [optional] replica
624/// to avoid the need to choose a replica each time
625/// a command is executed
626#[derive(Debug)]
627pub(crate) struct SlotAddrs {
628    primary: String,
629    replicas: Vec<String>,
630}
631
632impl SlotAddrs {
633    pub(crate) fn new(primary: String, replicas: Vec<String>) -> Self {
634        Self { primary, replicas }
635    }
636
637    fn get_replica_node(&self) -> &str {
638        self.replicas.choose(&mut rng()).unwrap_or(&self.primary)
639    }
640
641    pub(crate) fn slot_addr(&self, slot_addr: &SlotAddr, read_from_replica: bool) -> &str {
642        match slot_addr {
643            SlotAddr::Master => &self.primary,
644            SlotAddr::ReplicaOptional => {
645                if read_from_replica {
646                    self.get_replica_node()
647                } else {
648                    &self.primary
649                }
650            }
651            SlotAddr::ReplicaRequired => self.get_replica_node(),
652        }
653    }
654
655    pub(crate) fn from_slot(slot: Slot) -> Self {
656        SlotAddrs::new(slot.master, slot.replicas)
657    }
658}
659
660impl<'a> IntoIterator for &'a SlotAddrs {
661    type Item = &'a String;
662    type IntoIter = std::iter::Chain<std::iter::Once<&'a String>, std::slice::Iter<'a, String>>;
663
664    fn into_iter(
665        self,
666    ) -> std::iter::Chain<std::iter::Once<&'a String>, std::slice::Iter<'a, String>> {
667        std::iter::once(&self.primary).chain(self.replicas.iter())
668    }
669}
670
671#[derive(Debug)]
672struct SlotMapValue {
673    start: u16,
674    addrs: SlotAddrs,
675}
676
677impl SlotMapValue {
678    fn from_slot(slot: Slot) -> Self {
679        Self {
680            start: slot.start,
681            addrs: SlotAddrs::from_slot(slot),
682        }
683    }
684}
685
686#[derive(Debug, Default)]
687pub(crate) struct SlotMap {
688    slots: BTreeMap<u16, SlotMapValue>,
689    read_from_replica: bool,
690}
691
692impl SlotMap {
693    pub fn new(read_from_replica: bool) -> Self {
694        Self {
695            slots: Default::default(),
696            read_from_replica,
697        }
698    }
699
700    pub fn from_slots(slots: Vec<Slot>, read_from_replica: bool) -> Self {
701        Self {
702            slots: slots
703                .into_iter()
704                .map(|slot| (slot.end, SlotMapValue::from_slot(slot)))
705                .collect(),
706            read_from_replica,
707        }
708    }
709
710    #[cfg(feature = "cluster-async")]
711    pub fn fill_slots(&mut self, slots: Vec<Slot>) {
712        for slot in slots {
713            self.slots.insert(slot.end, SlotMapValue::from_slot(slot));
714        }
715    }
716
717    pub fn slot_addr_for_route(&self, route: &Route) -> Option<&str> {
718        let slot = route.slot();
719        self.slots
720            .range(slot..)
721            .next()
722            .and_then(|(end, slot_value)| {
723                if slot <= *end && slot_value.start <= slot {
724                    Some(
725                        slot_value
726                            .addrs
727                            .slot_addr(route.slot_addr(), self.read_from_replica),
728                    )
729                } else {
730                    None
731                }
732            })
733    }
734
735    #[cfg(feature = "cluster-async")]
736    pub fn clear(&mut self) {
737        self.slots.clear();
738    }
739
740    pub fn values(&self) -> impl Iterator<Item = &SlotAddrs> {
741        self.slots.values().map(|slot_value| &slot_value.addrs)
742    }
743
744    fn all_unique_addresses(&self, only_primaries: bool) -> HashSet<&str> {
745        let mut addresses: HashSet<&str> = HashSet::new();
746        if only_primaries {
747            addresses.extend(
748                self.values().map(|slot_addrs| {
749                    slot_addrs.slot_addr(&SlotAddr::Master, self.read_from_replica)
750                }),
751            );
752        } else {
753            addresses.extend(
754                self.values()
755                    .flat_map(|slot_addrs| slot_addrs.into_iter())
756                    .map(|str| str.as_str()),
757            );
758        }
759
760        addresses
761    }
762
763    pub fn addresses_for_all_primaries(&self) -> HashSet<&str> {
764        self.all_unique_addresses(true)
765    }
766
767    pub fn addresses_for_all_nodes(&self) -> HashSet<&str> {
768        self.all_unique_addresses(false)
769    }
770
771    pub fn addresses_for_multi_slot<'a, 'b>(
772        &'a self,
773        routes: &'b [(Route, Vec<usize>)],
774    ) -> impl Iterator<Item = Option<&'a str>> + 'a
775    where
776        'b: 'a,
777    {
778        routes
779            .iter()
780            .map(|(route, _)| self.slot_addr_for_route(route))
781    }
782}
783
784/// Defines the slot and the [`SlotAddr`] to which
785/// a command should be sent
786#[derive(Eq, PartialEq, Clone, Copy, Debug, Hash)]
787pub struct Route(u16, SlotAddr);
788
789impl Route {
790    /// Returns a new Route.
791    pub fn new(slot: u16, slot_addr: SlotAddr) -> Self {
792        Self(slot, slot_addr)
793    }
794
795    pub(crate) fn slot(&self) -> u16 {
796        self.0
797    }
798
799    pub(crate) fn slot_addr(&self) -> &SlotAddr {
800        &self.1
801    }
802}
803
804fn get_hashtag(key: &[u8]) -> Option<&[u8]> {
805    let open = key.iter().position(|v| *v == b'{')?;
806
807    let close = key[open..].iter().position(|v| *v == b'}')?;
808
809    let rv = &key[open + 1..open + close];
810    (!rv.is_empty()).then_some(rv)
811}
812
813#[cfg(test)]
814mod tests {
815    use core::panic;
816    use std::collections::HashSet;
817
818    use super::{
819        command_for_multi_slot_indices, get_hashtag, slot, MultipleNodeRoutingInfo, Route,
820        RoutingInfo, SingleNodeRoutingInfo, Slot, SlotAddr, SlotMap,
821    };
822    use crate::{
823        cluster_routing::{AggregateOp, ResponsePolicy},
824        cmd,
825        parser::parse_redis_value,
826        Value,
827    };
828
829    #[test]
830    fn test_get_hashtag() {
831        assert_eq!(get_hashtag(&b"foo{bar}baz"[..]), Some(&b"bar"[..]));
832        assert_eq!(get_hashtag(&b"foo{}{baz}"[..]), None);
833        assert_eq!(get_hashtag(&b"foo{{bar}}zap"[..]), Some(&b"{bar"[..]));
834    }
835
836    #[test]
837    fn test_routing_info_mixed_capatalization() {
838        let mut upper = cmd("XREAD");
839        upper.arg("STREAMS").arg("foo").arg(0);
840
841        let mut lower = cmd("xread");
842        lower.arg("streams").arg("foo").arg(0);
843
844        assert_eq!(
845            RoutingInfo::for_routable(&upper).unwrap(),
846            RoutingInfo::for_routable(&lower).unwrap()
847        );
848
849        let mut mixed = cmd("xReAd");
850        mixed.arg("StReAmS").arg("foo").arg(0);
851
852        assert_eq!(
853            RoutingInfo::for_routable(&lower).unwrap(),
854            RoutingInfo::for_routable(&mixed).unwrap()
855        );
856    }
857
858    #[test]
859    fn test_routing_info() {
860        let mut test_cmds = vec![];
861
862        // RoutingInfo::AllMasters
863        let mut test_cmd = cmd("FLUSHALL");
864        test_cmd.arg("");
865        test_cmds.push(test_cmd);
866
867        // RoutingInfo::AllNodes
868        test_cmd = cmd("ECHO");
869        test_cmd.arg("");
870        test_cmds.push(test_cmd);
871
872        // Routing key is 2nd arg ("42")
873        test_cmd = cmd("SET");
874        test_cmd.arg("42");
875        test_cmds.push(test_cmd);
876
877        // Routing key is 3rd arg ("FOOBAR")
878        test_cmd = cmd("XINFO");
879        test_cmd.arg("GROUPS").arg("FOOBAR");
880        test_cmds.push(test_cmd);
881
882        // Routing key is 3rd or 4th arg (3rd = "0" == RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
883        test_cmd = cmd("EVAL");
884        test_cmd.arg("FOO").arg("0").arg("BAR");
885        test_cmds.push(test_cmd);
886
887        // Routing key is 3rd or 4th arg (3rd != "0" == RoutingInfo::Slot)
888        test_cmd = cmd("EVAL");
889        test_cmd.arg("FOO").arg("4").arg("BAR");
890        test_cmds.push(test_cmd);
891
892        // Routing key position is variable, 3rd arg
893        test_cmd = cmd("XREAD");
894        test_cmd.arg("STREAMS").arg("4");
895        test_cmds.push(test_cmd);
896
897        // Routing key position is variable, 4th arg
898        test_cmd = cmd("XREAD");
899        test_cmd.arg("FOO").arg("STREAMS").arg("4");
900        test_cmds.push(test_cmd);
901
902        for cmd in test_cmds {
903            let value = parse_redis_value(&cmd.get_packed_command()).unwrap();
904            assert_eq!(
905                RoutingInfo::for_routable(&value).unwrap(),
906                RoutingInfo::for_routable(&cmd).unwrap(),
907            );
908        }
909
910        // Assert expected RoutingInfo explicitly:
911
912        for cmd in [cmd("FLUSHALL"), cmd("FLUSHDB"), cmd("PING")] {
913            assert_eq!(
914                RoutingInfo::for_routable(&cmd),
915                Some(RoutingInfo::MultiNode((
916                    MultipleNodeRoutingInfo::AllMasters,
917                    Some(ResponsePolicy::AllSucceeded)
918                )))
919            );
920        }
921
922        assert_eq!(
923            RoutingInfo::for_routable(&cmd("DBSIZE")),
924            Some(RoutingInfo::MultiNode((
925                MultipleNodeRoutingInfo::AllMasters,
926                Some(ResponsePolicy::Aggregate(AggregateOp::Sum))
927            )))
928        );
929
930        assert_eq!(
931            RoutingInfo::for_routable(&cmd("SCRIPT KILL")),
932            Some(RoutingInfo::MultiNode((
933                MultipleNodeRoutingInfo::AllMasters,
934                Some(ResponsePolicy::OneSucceeded)
935            )))
936        );
937
938        assert_eq!(
939            RoutingInfo::for_routable(&cmd("INFO")),
940            Some(RoutingInfo::MultiNode((
941                MultipleNodeRoutingInfo::AllMasters,
942                Some(ResponsePolicy::Special)
943            )))
944        );
945
946        assert_eq!(
947            RoutingInfo::for_routable(&cmd("KEYS")),
948            Some(RoutingInfo::MultiNode((
949                MultipleNodeRoutingInfo::AllMasters,
950                Some(ResponsePolicy::CombineArrays)
951            )))
952        );
953
954        for cmd in vec![
955            cmd("SCAN"),
956            cmd("SHUTDOWN"),
957            cmd("SLAVEOF"),
958            cmd("REPLICAOF"),
959            cmd("MOVE"),
960            cmd("BITOP"),
961        ] {
962            assert_eq!(
963                RoutingInfo::for_routable(&cmd),
964                None,
965                "{}",
966                std::str::from_utf8(cmd.arg_idx(0).unwrap()).unwrap()
967            );
968        }
969
970        for cmd in [
971            cmd("EVAL").arg(r#"redis.call("PING");"#).arg(0),
972            cmd("EVALSHA").arg(r#"redis.call("PING");"#).arg(0),
973        ] {
974            assert_eq!(
975                RoutingInfo::for_routable(cmd),
976                Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
977            );
978        }
979
980        for (cmd, expected) in [
981            (
982                cmd("EVAL")
983                    .arg(r#"redis.call("GET, KEYS[1]");"#)
984                    .arg(1)
985                    .arg("foo"),
986                Some(RoutingInfo::SingleNode(
987                    SingleNodeRoutingInfo::SpecificNode(Route::new(slot(b"foo"), SlotAddr::Master)),
988                )),
989            ),
990            (
991                cmd("XGROUP")
992                    .arg("CREATE")
993                    .arg("mystream")
994                    .arg("workers")
995                    .arg("$")
996                    .arg("MKSTREAM"),
997                Some(RoutingInfo::SingleNode(
998                    SingleNodeRoutingInfo::SpecificNode(Route::new(
999                        slot(b"mystream"),
1000                        SlotAddr::Master,
1001                    )),
1002                )),
1003            ),
1004            (
1005                cmd("XINFO").arg("GROUPS").arg("foo"),
1006                Some(RoutingInfo::SingleNode(
1007                    SingleNodeRoutingInfo::SpecificNode(Route::new(
1008                        slot(b"foo"),
1009                        SlotAddr::ReplicaOptional,
1010                    )),
1011                )),
1012            ),
1013            (
1014                cmd("XREADGROUP")
1015                    .arg("GROUP")
1016                    .arg("wkrs")
1017                    .arg("consmrs")
1018                    .arg("STREAMS")
1019                    .arg("mystream"),
1020                Some(RoutingInfo::SingleNode(
1021                    SingleNodeRoutingInfo::SpecificNode(Route::new(
1022                        slot(b"mystream"),
1023                        SlotAddr::Master,
1024                    )),
1025                )),
1026            ),
1027            (
1028                cmd("XREAD")
1029                    .arg("COUNT")
1030                    .arg("2")
1031                    .arg("STREAMS")
1032                    .arg("mystream")
1033                    .arg("writers")
1034                    .arg("0-0")
1035                    .arg("0-0"),
1036                Some(RoutingInfo::SingleNode(
1037                    SingleNodeRoutingInfo::SpecificNode(Route::new(
1038                        slot(b"mystream"),
1039                        SlotAddr::ReplicaOptional,
1040                    )),
1041                )),
1042            ),
1043        ] {
1044            assert_eq!(
1045                RoutingInfo::for_routable(cmd),
1046                expected,
1047                "{}",
1048                std::str::from_utf8(cmd.arg_idx(0).unwrap()).unwrap()
1049            );
1050        }
1051    }
1052
1053    #[test]
1054    fn test_slot_for_packed_cmd() {
1055        assert!(matches!(RoutingInfo::for_routable(&parse_redis_value(&[
1056                42, 50, 13, 10, 36, 54, 13, 10, 69, 88, 73, 83, 84, 83, 13, 10, 36, 49, 54, 13, 10,
1057                244, 93, 23, 40, 126, 127, 253, 33, 89, 47, 185, 204, 171, 249, 96, 139, 13, 10
1058            ]).unwrap()), Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route(slot, SlotAddr::ReplicaOptional)))) if slot == 964));
1059
1060        assert!(matches!(RoutingInfo::for_routable(&parse_redis_value(&[
1061                42, 54, 13, 10, 36, 51, 13, 10, 83, 69, 84, 13, 10, 36, 49, 54, 13, 10, 36, 241,
1062                197, 111, 180, 254, 5, 175, 143, 146, 171, 39, 172, 23, 164, 145, 13, 10, 36, 52,
1063                13, 10, 116, 114, 117, 101, 13, 10, 36, 50, 13, 10, 78, 88, 13, 10, 36, 50, 13, 10,
1064                80, 88, 13, 10, 36, 55, 13, 10, 49, 56, 48, 48, 48, 48, 48, 13, 10
1065            ]).unwrap()), Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route(slot, SlotAddr::Master)))) if slot == 8352));
1066
1067        assert!(matches!(RoutingInfo::for_routable(&parse_redis_value(&[
1068                42, 54, 13, 10, 36, 51, 13, 10, 83, 69, 84, 13, 10, 36, 49, 54, 13, 10, 169, 233,
1069                247, 59, 50, 247, 100, 232, 123, 140, 2, 101, 125, 221, 66, 170, 13, 10, 36, 52,
1070                13, 10, 116, 114, 117, 101, 13, 10, 36, 50, 13, 10, 78, 88, 13, 10, 36, 50, 13, 10,
1071                80, 88, 13, 10, 36, 55, 13, 10, 49, 56, 48, 48, 48, 48, 48, 13, 10
1072            ]).unwrap()), Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route(slot, SlotAddr::Master)))) if slot == 5210));
1073    }
1074
1075    #[test]
1076    fn test_multi_shard() {
1077        let mut cmd = cmd("DEL");
1078        cmd.arg("foo").arg("bar").arg("baz").arg("{bar}vaz");
1079        let routing = RoutingInfo::for_routable(&cmd);
1080        let mut expected = std::collections::HashMap::new();
1081        expected.insert(Route(4813, SlotAddr::Master), vec![2]);
1082        expected.insert(Route(5061, SlotAddr::Master), vec![1, 3]);
1083        expected.insert(Route(12182, SlotAddr::Master), vec![0]);
1084
1085        assert!(
1086            matches!(routing.clone(), Some(RoutingInfo::MultiNode((MultipleNodeRoutingInfo::MultiSlot(vec), Some(ResponsePolicy::Aggregate(AggregateOp::Sum))))) if {
1087                let routes = vec.clone().into_iter().collect();
1088                expected == routes
1089            }),
1090            "{routing:?}"
1091        );
1092
1093        let mut cmd = crate::cmd("MGET");
1094        cmd.arg("foo").arg("bar").arg("baz").arg("{bar}vaz");
1095        let routing = RoutingInfo::for_routable(&cmd);
1096        let mut expected = std::collections::HashMap::new();
1097        expected.insert(Route(4813, SlotAddr::ReplicaOptional), vec![2]);
1098        expected.insert(Route(5061, SlotAddr::ReplicaOptional), vec![1, 3]);
1099        expected.insert(Route(12182, SlotAddr::ReplicaOptional), vec![0]);
1100
1101        assert!(
1102            matches!(routing.clone(), Some(RoutingInfo::MultiNode((MultipleNodeRoutingInfo::MultiSlot(vec), Some(ResponsePolicy::CombineArrays)))) if {
1103                let routes = vec.clone().into_iter().collect();
1104                expected ==routes
1105            }),
1106            "{routing:?}"
1107        );
1108    }
1109
1110    #[test]
1111    fn test_command_creation_for_multi_shard() {
1112        let mut original_cmd = cmd("DEL");
1113        original_cmd
1114            .arg("foo")
1115            .arg("bar")
1116            .arg("baz")
1117            .arg("{bar}vaz");
1118        let routing = RoutingInfo::for_routable(&original_cmd);
1119        let expected = [vec![0], vec![1, 3], vec![2]];
1120
1121        let mut indices: Vec<_> = match routing {
1122            Some(RoutingInfo::MultiNode((MultipleNodeRoutingInfo::MultiSlot(vec), _))) => {
1123                vec.into_iter().map(|(_, indices)| indices).collect()
1124            }
1125            _ => panic!("unexpected routing: {routing:?}"),
1126        };
1127        indices.sort_by(|prev, next| prev.iter().next().unwrap().cmp(next.iter().next().unwrap())); // sorting because the `for_routable` doesn't return values in a consistent order between runs.
1128
1129        for (index, indices) in indices.into_iter().enumerate() {
1130            let cmd = command_for_multi_slot_indices(&original_cmd, indices.iter());
1131            let expected_indices = &expected[index];
1132            assert_eq!(original_cmd.arg_idx(0), cmd.arg_idx(0));
1133            for (index, target_index) in expected_indices.iter().enumerate() {
1134                let target_index = target_index + 1;
1135                assert_eq!(original_cmd.arg_idx(target_index), cmd.arg_idx(index + 1));
1136            }
1137        }
1138    }
1139
1140    #[test]
1141    fn test_combine_multi_shard_to_single_node_when_all_keys_are_in_same_slot() {
1142        let mut cmd = cmd("DEL");
1143        cmd.arg("foo").arg("{foo}bar").arg("{foo}baz");
1144        let routing = RoutingInfo::for_routable(&cmd);
1145
1146        assert!(
1147            matches!(
1148                routing,
1149                Some(RoutingInfo::SingleNode(
1150                    SingleNodeRoutingInfo::SpecificNode(Route(12182, SlotAddr::Master))
1151                ))
1152            ),
1153            "{routing:?}"
1154        );
1155    }
1156
1157    #[test]
1158    fn test_slot_map() {
1159        let slot_map = SlotMap::from_slots(
1160            vec![
1161                Slot {
1162                    start: 1,
1163                    end: 1000,
1164                    master: "node1:6379".to_owned(),
1165                    replicas: vec!["replica1:6379".to_owned()],
1166                },
1167                Slot {
1168                    start: 1001,
1169                    end: 2000,
1170                    master: "node2:6379".to_owned(),
1171                    replicas: vec!["replica2:6379".to_owned()],
1172                },
1173            ],
1174            true,
1175        );
1176
1177        assert_eq!(
1178            "node1:6379",
1179            slot_map
1180                .slot_addr_for_route(&Route::new(1, SlotAddr::Master))
1181                .unwrap()
1182        );
1183        assert_eq!(
1184            "node1:6379",
1185            slot_map
1186                .slot_addr_for_route(&Route::new(500, SlotAddr::Master))
1187                .unwrap()
1188        );
1189        assert_eq!(
1190            "node1:6379",
1191            slot_map
1192                .slot_addr_for_route(&Route::new(1000, SlotAddr::Master))
1193                .unwrap()
1194        );
1195        assert_eq!(
1196            "replica1:6379",
1197            slot_map
1198                .slot_addr_for_route(&Route::new(1000, SlotAddr::ReplicaOptional))
1199                .unwrap()
1200        );
1201        assert_eq!(
1202            "node2:6379",
1203            slot_map
1204                .slot_addr_for_route(&Route::new(1001, SlotAddr::Master))
1205                .unwrap()
1206        );
1207        assert_eq!(
1208            "node2:6379",
1209            slot_map
1210                .slot_addr_for_route(&Route::new(1500, SlotAddr::Master))
1211                .unwrap()
1212        );
1213        assert_eq!(
1214            "node2:6379",
1215            slot_map
1216                .slot_addr_for_route(&Route::new(2000, SlotAddr::Master))
1217                .unwrap()
1218        );
1219        assert!(slot_map
1220            .slot_addr_for_route(&Route::new(2001, SlotAddr::Master))
1221            .is_none());
1222    }
1223
1224    #[test]
1225    fn test_slot_map_when_read_from_replica_is_false() {
1226        let slot_map = SlotMap::from_slots(
1227            vec![Slot {
1228                start: 1,
1229                end: 1000,
1230                master: "node1:6379".to_owned(),
1231                replicas: vec!["replica1:6379".to_owned()],
1232            }],
1233            false,
1234        );
1235
1236        assert_eq!(
1237            "node1:6379",
1238            slot_map
1239                .slot_addr_for_route(&Route::new(1000, SlotAddr::ReplicaOptional))
1240                .unwrap()
1241        );
1242        assert_eq!(
1243            "replica1:6379",
1244            slot_map
1245                .slot_addr_for_route(&Route::new(1000, SlotAddr::ReplicaRequired))
1246                .unwrap()
1247        );
1248    }
1249
1250    #[test]
1251    fn test_combining_results_into_single_array() {
1252        let res1 = Value::Array(vec![Value::Nil, Value::Okay]);
1253        let res2 = Value::Array(vec![
1254            Value::BulkString("1".as_bytes().to_vec()),
1255            Value::BulkString("4".as_bytes().to_vec()),
1256        ]);
1257        let res3 = Value::Array(vec![Value::SimpleString("2".to_string()), Value::Int(3)]);
1258        let results = super::combine_and_sort_array_results(
1259            vec![res1, res2, res3],
1260            [vec![0, 5], vec![1, 4], vec![2, 3]].iter(),
1261        );
1262
1263        assert_eq!(
1264            results.unwrap(),
1265            Value::Array(vec![
1266                Value::Nil,
1267                Value::BulkString("1".as_bytes().to_vec()),
1268                Value::SimpleString("2".to_string()),
1269                Value::Int(3),
1270                Value::BulkString("4".as_bytes().to_vec()),
1271                Value::Okay,
1272            ])
1273        );
1274    }
1275
1276    fn get_slot_map(read_from_replica: bool) -> SlotMap {
1277        SlotMap::from_slots(
1278            vec![
1279                Slot::new(
1280                    1,
1281                    1000,
1282                    "node1:6379".to_owned(),
1283                    vec!["replica1:6379".to_owned()],
1284                ),
1285                Slot::new(
1286                    1002,
1287                    2000,
1288                    "node2:6379".to_owned(),
1289                    vec!["replica2:6379".to_owned(), "replica3:6379".to_owned()],
1290                ),
1291                Slot::new(
1292                    2001,
1293                    3000,
1294                    "node3:6379".to_owned(),
1295                    vec![
1296                        "replica4:6379".to_owned(),
1297                        "replica5:6379".to_owned(),
1298                        "replica6:6379".to_owned(),
1299                    ],
1300                ),
1301                Slot::new(
1302                    3001,
1303                    4000,
1304                    "node2:6379".to_owned(),
1305                    vec!["replica2:6379".to_owned(), "replica3:6379".to_owned()],
1306                ),
1307            ],
1308            read_from_replica,
1309        )
1310    }
1311
1312    #[test]
1313    fn test_slot_map_get_all_primaries() {
1314        let slot_map = get_slot_map(false);
1315        let addresses = slot_map.addresses_for_all_primaries();
1316        assert_eq!(
1317            addresses,
1318            HashSet::from_iter(["node1:6379", "node2:6379", "node3:6379"])
1319        );
1320    }
1321
1322    #[test]
1323    fn test_slot_map_get_all_nodes() {
1324        let slot_map = get_slot_map(false);
1325        let addresses = slot_map.addresses_for_all_nodes();
1326        assert_eq!(
1327            addresses,
1328            HashSet::from_iter([
1329                "node1:6379",
1330                "node2:6379",
1331                "node3:6379",
1332                "replica1:6379",
1333                "replica2:6379",
1334                "replica3:6379",
1335                "replica4:6379",
1336                "replica5:6379",
1337                "replica6:6379"
1338            ])
1339        );
1340    }
1341
1342    #[test]
1343    fn test_slot_map_get_multi_node() {
1344        let slot_map = get_slot_map(true);
1345        let routes = vec![
1346            (Route::new(1, SlotAddr::Master), vec![]),
1347            (Route::new(2001, SlotAddr::ReplicaOptional), vec![]),
1348        ];
1349        let addresses = slot_map
1350            .addresses_for_multi_slot(&routes)
1351            .collect::<Vec<_>>();
1352        assert!(addresses.contains(&Some("node1:6379")));
1353        assert!(
1354            addresses.contains(&Some("replica4:6379"))
1355                || addresses.contains(&Some("replica5:6379"))
1356                || addresses.contains(&Some("replica6:6379"))
1357        );
1358    }
1359
1360    #[test]
1361    fn test_slot_map_should_ignore_replicas_in_multi_slot_if_read_from_replica_is_false() {
1362        let slot_map = get_slot_map(false);
1363        let routes = vec![
1364            (Route::new(1, SlotAddr::Master), vec![]),
1365            (Route::new(2001, SlotAddr::ReplicaOptional), vec![]),
1366        ];
1367        let addresses = slot_map
1368            .addresses_for_multi_slot(&routes)
1369            .collect::<Vec<_>>();
1370        assert_eq!(addresses, vec![Some("node1:6379"), Some("node3:6379")]);
1371    }
1372
1373    /// This test is needed in order to verify that if the MultiSlot route finds the same node for more than a single route,
1374    /// that node's address will appear multiple times, in the same order.
1375    #[test]
1376    fn test_slot_map_get_repeating_addresses_when_the_same_node_is_found_in_multi_slot() {
1377        let slot_map = get_slot_map(true);
1378        let routes = vec![
1379            (Route::new(1, SlotAddr::ReplicaOptional), vec![]),
1380            (Route::new(2001, SlotAddr::Master), vec![]),
1381            (Route::new(2, SlotAddr::ReplicaOptional), vec![]),
1382            (Route::new(2002, SlotAddr::Master), vec![]),
1383            (Route::new(3, SlotAddr::ReplicaOptional), vec![]),
1384            (Route::new(2003, SlotAddr::Master), vec![]),
1385        ];
1386        let addresses = slot_map
1387            .addresses_for_multi_slot(&routes)
1388            .collect::<Vec<_>>();
1389        assert_eq!(
1390            addresses,
1391            vec![
1392                Some("replica1:6379"),
1393                Some("node3:6379"),
1394                Some("replica1:6379"),
1395                Some("node3:6379"),
1396                Some("replica1:6379"),
1397                Some("node3:6379")
1398            ]
1399        );
1400    }
1401
1402    #[test]
1403    fn test_slot_map_get_none_when_slot_is_missing_from_multi_slot() {
1404        let slot_map = get_slot_map(true);
1405        let routes = vec![
1406            (Route::new(1, SlotAddr::ReplicaOptional), vec![]),
1407            (Route::new(5000, SlotAddr::Master), vec![]),
1408            (Route::new(6000, SlotAddr::ReplicaOptional), vec![]),
1409            (Route::new(2002, SlotAddr::Master), vec![]),
1410        ];
1411        let addresses = slot_map
1412            .addresses_for_multi_slot(&routes)
1413            .collect::<Vec<_>>();
1414        assert_eq!(
1415            addresses,
1416            vec![Some("replica1:6379"), None, None, Some("node3:6379")]
1417        );
1418    }
1419}