redis/cluster_handling/
routing.rs

1use rand::Rng;
2
3use crate::cluster_handling::slot_map::SLOT_SIZE;
4use crate::cmd::{Arg, Cmd};
5use crate::commands::is_readonly_cmd;
6use crate::types::Value;
7use crate::{ErrorKind, RedisError, RedisResult};
8use std::borrow::Cow;
9use std::cmp::min;
10use std::collections::HashMap;
11
12fn slot(key: &[u8]) -> u16 {
13    crc16::State::<crc16::XMODEM>::calculate(key) % SLOT_SIZE
14}
15
16/// Returns the slot that matches `key`.
17pub(crate) fn get_slot(key: &[u8]) -> u16 {
18    let key = match get_hashtag(key) {
19        Some(tag) => tag,
20        None => key,
21    };
22
23    slot(key)
24}
25
26#[derive(Clone, PartialEq, Debug)]
27pub(crate) enum Redirect {
28    Moved(String),
29    Ask(String),
30}
31
32/// Logical bitwise aggregating operators.
33#[derive(Debug, Clone, Copy, PartialEq)]
34pub enum LogicalAggregateOp {
35    /// Aggregate by bitwise &&
36    And,
37    // Or, omitted due to dead code warnings. ATM this value isn't constructed anywhere
38}
39
40/// Numerical aggregating operators.
41#[derive(Debug, Clone, Copy, PartialEq)]
42pub enum AggregateOp {
43    /// Choose minimal value
44    Min,
45    /// Sum all values
46    Sum,
47    // Max, omitted due to dead code warnings. ATM this value isn't constructed anywhere
48}
49
50/// Policy defining how to combine multiple responses into one.
51#[derive(Debug, Clone, Copy, PartialEq)]
52pub enum ResponsePolicy {
53    /// Wait for one request to succeed and return its results. Return error if all requests fail.
54    OneSucceeded,
55    /// Returns the first succeeded non-empty result; if all results are empty, returns `Nil`; otherwise, returns the last received error.
56    FirstSucceededNonEmptyOrAllEmpty,
57    /// Waits for all requests to succeed, and the returns one of the successes. Returns the error on the first received error.
58    AllSucceeded,
59    /// Aggregate success results according to a logical bitwise operator. Return error on any failed request or on a response that doesn't conform to 0 or 1.
60    AggregateLogical(LogicalAggregateOp),
61    /// Aggregate success results according to a numeric operator. Return error on any failed request or on a response that isn't an integer.
62    Aggregate(AggregateOp),
63    /// Aggregate array responses into a single array. Return error on any failed request or on a response that isn't an array.
64    CombineArrays,
65    /// Handling is not defined by the Redis standard. Will receive a special case
66    Special,
67    /// Combines multiple map responses into a single map.
68    CombineMaps,
69}
70
71/// Defines whether a request should be routed to a single node, or multiple ones.
72#[derive(Debug, Clone, PartialEq)]
73pub enum RoutingInfo {
74    /// Route to single node
75    SingleNode(SingleNodeRoutingInfo),
76    /// Route to multiple nodes
77    MultiNode((MultipleNodeRoutingInfo, Option<ResponsePolicy>)),
78}
79
80/// Defines which single node should receive a request.
81#[derive(Debug, Clone, PartialEq)]
82pub enum SingleNodeRoutingInfo {
83    /// Route to any node at random
84    Random,
85    /// Route to any *primary* node
86    RandomPrimary,
87    /// Route to the node that matches the [Route]
88    SpecificNode(Route),
89    /// Route to the node with the given address.
90    ByAddress {
91        /// DNS hostname of the node
92        host: String,
93        /// port of the node
94        port: u16,
95    },
96}
97
98impl From<Option<Route>> for SingleNodeRoutingInfo {
99    fn from(value: Option<Route>) -> Self {
100        value
101            .map(SingleNodeRoutingInfo::SpecificNode)
102            .unwrap_or(SingleNodeRoutingInfo::Random)
103    }
104}
105
106/// Defines which collection of nodes should receive a request
107#[derive(Debug, Clone, PartialEq)]
108pub enum MultipleNodeRoutingInfo {
109    /// Route to all nodes in the clusters
110    AllNodes,
111    /// Route to all primaries in the cluster
112    AllMasters,
113    /// Routes the request to multiple slots.
114    /// This variant contains instructions for splitting a multi-slot command (e.g., MGET, MSET) into sub-commands.
115    /// Each tuple consists of a `Route` representing the target node for the subcommand,
116    /// and a vector of argument indices from the original command that should be copied to each subcommand.
117    /// The `MultiSlotArgPattern` specifies the pattern of the command’s arguments, indicating how they are organized
118    /// (e.g., only keys, key-value pairs, etc).
119    MultiSlot((Vec<(Route, Vec<usize>)>, MultiSlotArgPattern)),
120}
121
122/// Takes a routable and an iterator of indices, which is assued to be created from`MultipleNodeRoutingInfo::MultiSlot`,
123/// and returns a command with the arguments matching the indices.
124pub(crate) fn command_for_multi_slot_indices<'a, 'b>(
125    original_cmd: &'a impl Routable,
126    indices: impl Iterator<Item = &'b usize> + 'a,
127) -> Cmd
128where
129    'b: 'a,
130{
131    let mut new_cmd = Cmd::new();
132    let command_length = 1; // TODO - the +1 should change if we have multi-slot commands with 2 command words.
133    new_cmd.arg(original_cmd.arg_idx(0));
134    for index in indices {
135        new_cmd.arg(original_cmd.arg_idx(index + command_length));
136    }
137    new_cmd
138}
139
140/// Aggreagte numeric responses.
141pub(crate) fn aggregate(values: Vec<Value>, op: AggregateOp) -> RedisResult<Value> {
142    let initial_value = match op {
143        AggregateOp::Min => i64::MAX,
144        AggregateOp::Sum => 0,
145    };
146    let result = values.into_iter().try_fold(initial_value, |acc, curr| {
147        let int = match curr {
148            Value::Int(int) => int,
149            _ => {
150                return RedisResult::Err(
151                    (
152                        ErrorKind::TypeError,
153                        "expected array of integers as response",
154                    )
155                        .into(),
156                );
157            }
158        };
159        let acc = match op {
160            AggregateOp::Min => min(acc, int),
161            AggregateOp::Sum => acc + int,
162        };
163        Ok(acc)
164    })?;
165    Ok(Value::Int(result))
166}
167
168/// Aggreagte numeric responses by a boolean operator.
169pub(crate) fn logical_aggregate(values: Vec<Value>, op: LogicalAggregateOp) -> RedisResult<Value> {
170    let initial_value = match op {
171        LogicalAggregateOp::And => true,
172    };
173    let results = values.into_iter().try_fold(Vec::new(), |acc, curr| {
174        let values = match curr {
175            Value::Array(values) => values,
176            _ => {
177                return RedisResult::Err(
178                    (
179                        ErrorKind::TypeError,
180                        "expected array of integers as response",
181                    )
182                        .into(),
183                );
184            }
185        };
186        let mut acc = if acc.is_empty() {
187            vec![initial_value; values.len()]
188        } else {
189            acc
190        };
191        for (index, value) in values.into_iter().enumerate() {
192            let int = match value {
193                Value::Int(int) => int,
194                _ => {
195                    return Err((
196                        ErrorKind::TypeError,
197                        "expected array of integers as response",
198                    )
199                        .into());
200                }
201            };
202            acc[index] = match op {
203                LogicalAggregateOp::And => acc[index] && (int > 0),
204            };
205        }
206        Ok(acc)
207    })?;
208    Ok(Value::Array(
209        results
210            .into_iter()
211            .map(|result| Value::Int(result as i64))
212            .collect(),
213    ))
214}
215/// Aggregate array responses into a single map.
216pub(crate) fn combine_map_results(values: Vec<Value>) -> RedisResult<Value> {
217    let mut map: HashMap<Vec<u8>, i64> = HashMap::new();
218
219    for value in values {
220        match value {
221            Value::Array(elements) => {
222                let mut iter = elements.into_iter();
223
224                while let Some(key) = iter.next() {
225                    if let Value::BulkString(key_bytes) = key {
226                        if let Some(Value::Int(value)) = iter.next() {
227                            *map.entry(key_bytes).or_insert(0) += value;
228                        } else {
229                            return Err((ErrorKind::TypeError, "expected integer value").into());
230                        }
231                    } else {
232                        return Err((ErrorKind::TypeError, "expected string key").into());
233                    }
234                }
235            }
236            _ => {
237                return Err((ErrorKind::TypeError, "expected array of values as response").into());
238            }
239        }
240    }
241
242    let result_vec: Vec<(Value, Value)> = map
243        .into_iter()
244        .map(|(k, v)| (Value::BulkString(k), Value::Int(v)))
245        .collect();
246
247    Ok(Value::Map(result_vec))
248}
249
250/// Aggregate array responses into a single array.
251pub(crate) fn combine_array_results(values: Vec<Value>) -> RedisResult<Value> {
252    let mut results = Vec::new();
253
254    for value in values {
255        match value {
256            Value::Array(values) => results.extend(values),
257            _ => {
258                return Err((ErrorKind::TypeError, "expected array of values as response").into());
259            }
260        }
261    }
262
263    Ok(Value::Array(results))
264}
265
266// An iterator that yields `Cow<[usize]>` representing grouped result indices according to a specified argument pattern.
267// This type is used to combine multi-slot array responses.
268type MultiSlotResIdxIter<'a> = std::iter::Map<
269    std::slice::Iter<'a, (Route, Vec<usize>)>,
270    fn(&'a (Route, Vec<usize>)) -> Cow<'a, [usize]>,
271>;
272
273/// Generates an iterator that yields a vector of result indices for each slot within the final merged results array for a multi-slot command response.
274/// The indices are calculated based on the `args_pattern` and the positions of the arguments for each slot-specific request in the original multi-slot request,
275/// ensuring that the results are ordered according to the structure of the initial multi-slot command.
276///
277/// # Arguments
278/// * `route_arg_indices` - A reference to a vector where each element is a tuple containing a route and
279///   the corresponding argument indices for that route.
280/// * `args_pattern` - Specifies the argument pattern (e.g., `KeysOnly`, `KeyValuePairs`, ..), which defines how the indices are grouped for each slot.
281///
282/// # Returns
283/// An iterator yielding `Cow<[usize]>` with the grouped result indices based on the specified argument pattern.
284///
285/// /// For example, given the command `MSET foo bar foo2 bar2 {foo}foo3 bar3` with the `KeyValuePairs` pattern:
286/// - `route_arg_indices` would include:
287///   - Slot of "foo" with argument indices `[0, 1, 4, 5]` (where `{foo}foo3` hashes to the same slot as "foo" due to curly braces).
288///   - Slot of "foo2" with argument indices `[2, 3]`.
289/// - Using the `KeyValuePairs` pattern, each key-value pair contributes a single response, yielding three responses total.
290/// - Therefore, the iterator generated by this function would yield grouped result indices as follows:
291///   - Slot "foo" is mapped to `[0, 2]` in the final result order.
292///   - Slot "foo2" is mapped to `[1]`.
293fn calculate_multi_slot_result_indices<'a>(
294    route_arg_indices: &'a [(Route, Vec<usize>)],
295    args_pattern: &MultiSlotArgPattern,
296) -> RedisResult<MultiSlotResIdxIter<'a>> {
297    let check_indices_input = |step_count: usize| {
298        for (_, indices) in route_arg_indices {
299            if indices.len() % step_count != 0 {
300                return Err(RedisError::from((
301                    ErrorKind::ClientError,
302                    "Invalid indices input detected",
303                    format!(
304                        "Expected argument pattern with tuples of size {step_count}, but found indices: {indices:?}"
305                    ),
306                )));
307            }
308        }
309        Ok(())
310    };
311
312    match args_pattern {
313        MultiSlotArgPattern::KeysOnly => Ok(route_arg_indices
314            .iter()
315            .map(|(_, indices)| Cow::Borrowed(indices))),
316        MultiSlotArgPattern::KeysAndLastArg => {
317            // The last index corresponds to the path, skip it
318            Ok(route_arg_indices
319                .iter()
320                .map(|(_, indices)| Cow::Borrowed(&indices[..indices.len() - 1])))
321        }
322        MultiSlotArgPattern::KeyWithTwoArgTriples => {
323            // For each triplet (key, path, value) we receive a single response.
324            // For example, for argument indices: [(_, [0,1,2]), (_, [3,4,5,9,10,11]), (_, [6,7,8])]
325            // The resulting grouped indices would be: [0], [1, 3], [2]
326            check_indices_input(3)?;
327            Ok(route_arg_indices.iter().map(|(_, indices)| {
328                Cow::Owned(
329                    indices
330                        .iter()
331                        .step_by(3)
332                        .map(|idx| idx / 3)
333                        .collect::<Vec<usize>>(),
334                )
335            }))
336        }
337        MultiSlotArgPattern::KeyValuePairs =>
338        // For each pair (key, value) we receive a single response.
339        // For example, for argument indices: [(_, [0,1]), (_, [2,3,6,7]), (_, [4,5])]
340        // The resulting grouped indices would be: [0], [1, 3], [2]
341        {
342            check_indices_input(2)?;
343            Ok(route_arg_indices.iter().map(|(_, indices)| {
344                Cow::Owned(
345                    indices
346                        .iter()
347                        .step_by(2)
348                        .map(|idx| idx / 2)
349                        .collect::<Vec<usize>>(),
350                )
351            }))
352        }
353    }
354}
355
356/// Merges the results of a multi-slot command from the `values` field, where each entry is expected to be an array of results.
357/// The combined results are ordered according to the sequence in which they appeared in the original command.
358///
359/// # Arguments
360///
361/// * `values` - A vector of `Value`s, where each `Value` is expected to be an array representing results
362///   from separate slots in a multi-slot command. Each `Value::Array` within `values` corresponds to
363///   the results associated with a specific slot, as indicated by `route_arg_indices`.
364///
365/// * `route_arg_indices` - A reference to a vector of tuples, where each tuple represents a route and a vector of
366///   argument indices associated with that route. The route indicates the slot, while the indices vector
367///   specifies the positions of arguments relevant to this slot. This is used to construct `sorting_order`,
368///   which guides the placement of results in the final array.
369///
370/// * `args_pattern` - Specifies the argument pattern (e.g., `KeysOnly`, `KeyValuePairs`, ...).
371///   The pattern defines how the argument indices are grouped for each slot and determines
372///   the ordering of results from `values` as they are placed in the final combined array.
373///
374/// # Returns
375///
376/// Returns a `RedisResult<Value>` containing the final ordered array (`Value::Array`) of combined results.
377pub(crate) fn combine_and_sort_array_results(
378    values: Vec<Value>,
379    route_arg_indices: &[(Route, Vec<usize>)],
380    args_pattern: &MultiSlotArgPattern,
381) -> RedisResult<Value> {
382    let result_indices = calculate_multi_slot_result_indices(route_arg_indices, args_pattern)?;
383    let mut results = Vec::new();
384    results.resize(
385        values.iter().fold(0, |acc, value| match value {
386            Value::Array(values) => values.len() + acc,
387            _ => 0,
388        }),
389        Value::Nil,
390    );
391    if values.len() != result_indices.len() {
392        return Err(RedisError::from((
393            ErrorKind::ClientError,
394            "Mismatch in the number of multi-slot results compared to the expected result count.",
395            format!(
396                "Expected: {:?}, Found: {:?}",
397                values.len(),
398                result_indices.len()
399            ),
400        )));
401    }
402
403    for (key_indices, value) in result_indices.into_iter().zip(values) {
404        match value {
405            Value::Array(values) => {
406                assert_eq!(values.len(), key_indices.len());
407                for (index, value) in key_indices.iter().zip(values) {
408                    results[*index] = value;
409                }
410            }
411            _ => {
412                return Err((ErrorKind::TypeError, "expected array of values as response").into());
413            }
414        }
415    }
416
417    Ok(Value::Array(results))
418}
419
420fn get_route(is_readonly: bool, key: &[u8]) -> Route {
421    let slot = get_slot(key);
422    if is_readonly {
423        Route::new(slot, SlotAddr::ReplicaOptional)
424    } else {
425        Route::new(slot, SlotAddr::Master)
426    }
427}
428
429/// Represents the pattern of argument structures in multi-slot commands,
430/// defining how the arguments are organized in the command.
431#[derive(Debug, Clone, PartialEq)]
432pub enum MultiSlotArgPattern {
433    /// Pattern where only keys are provided in the command.
434    /// For example: `MGET key1 key2`
435    KeysOnly,
436
437    /// Pattern where each key is followed by a corresponding value.
438    /// For example: `MSET key1 value1 key2 value2`
439    KeyValuePairs,
440
441    /// Pattern where a list of keys is followed by a shared parameter.
442    /// For example: `JSON.MGET key1 key2 key3 path`
443    KeysAndLastArg,
444
445    /// Pattern where each key is followed by two associated arguments, forming key-argument-argument triples.
446    /// For example: `JSON.MSET key1 path1 value1 key2 path2 value2`
447    KeyWithTwoArgTriples,
448}
449
450/// Takes the given `routable` and creates a multi-slot routing info.
451/// This is used for commands like MSET & MGET, where if the command's keys
452/// are hashed to multiple slots, the command should be split into sub-commands,
453/// each targetting a single slot. The results of these sub-commands are then
454/// usually reassembled using `combine_and_sort_array_results`. In order to do this,
455/// `MultipleNodeRoutingInfo::MultiSlot` contains the routes for each sub-command, and
456/// the indices in the final combined result for each result from the sub-command.
457///
458/// If all keys are routed to the same slot, there's no need to split the command,
459/// so a single node routing info will be returned.
460///
461/// # Arguments
462/// * `routable` - The command or structure containing key-related data that can be routed.
463/// * `cmd` - A byte slice representing the command name or opcode (e.g., `b"MGET"`).
464/// * `first_key_index` - The starting index in the command where the first key is located.
465/// * `args_pattern` - Specifies how keys and values are patterned in the command (e.g., `OnlyKeys`, `KeyValuePairs`).
466///
467/// # Returns
468/// `Some(RoutingInfo)` if routing info is created, indicating the command targets multiple slots or a single slot;
469/// `None` if no routing info could be derived.
470fn multi_shard<R>(
471    routable: &R,
472    cmd: &[u8],
473    first_key_index: usize,
474    args_pattern: MultiSlotArgPattern,
475) -> Option<RoutingInfo>
476where
477    R: Routable + ?Sized,
478{
479    let is_readonly = is_readonly_cmd(cmd);
480    let mut routes = HashMap::new();
481    let mut curr_arg_idx = 0;
482    let incr_add_next_arg = |arg_indices: &mut Vec<usize>, mut curr_arg_idx: usize| {
483        curr_arg_idx += 1;
484        // Ensure there's a value following the key
485        routable.arg_idx(curr_arg_idx)?;
486        arg_indices.push(curr_arg_idx);
487        Some(curr_arg_idx)
488    };
489    while let Some(arg) = routable.arg_idx(first_key_index + curr_arg_idx) {
490        let route = get_route(is_readonly, arg);
491        let arg_indices = routes.entry(route).or_insert(Vec::new());
492
493        arg_indices.push(curr_arg_idx);
494
495        match args_pattern {
496            MultiSlotArgPattern::KeysOnly => {} // no additional handling needed for keys-only commands
497            MultiSlotArgPattern::KeyValuePairs => {
498                // Increment to the value paired with the current key and add its index
499                curr_arg_idx = incr_add_next_arg(arg_indices, curr_arg_idx)?;
500            }
501            MultiSlotArgPattern::KeysAndLastArg => {
502                // Check if the command has more keys or if the next argument is a path
503                if routable
504                    .arg_idx(first_key_index + curr_arg_idx + 2)
505                    .is_none()
506                {
507                    // Last key reached; add the path argument index for each route and break
508                    let path_idx = curr_arg_idx + 1;
509                    for (_, arg_indices) in routes.iter_mut() {
510                        arg_indices.push(path_idx);
511                    }
512                    break;
513                }
514            }
515            MultiSlotArgPattern::KeyWithTwoArgTriples => {
516                // Increment to the first argument associated with the current key and add its index
517                curr_arg_idx = incr_add_next_arg(arg_indices, curr_arg_idx)?;
518                // Increment to the second argument associated with the current key and add its index
519                curr_arg_idx = incr_add_next_arg(arg_indices, curr_arg_idx)?;
520            }
521        }
522        curr_arg_idx += 1;
523    }
524
525    let mut routes: Vec<(Route, Vec<usize>)> = routes.into_iter().collect();
526    if routes.is_empty() {
527        return None;
528    }
529
530    Some(if routes.len() == 1 {
531        RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(routes.pop().unwrap().0))
532    } else {
533        RoutingInfo::MultiNode((
534            MultipleNodeRoutingInfo::MultiSlot((routes, args_pattern)),
535            ResponsePolicy::for_command(cmd),
536        ))
537    })
538}
539
540impl ResponsePolicy {
541    /// Parse the command for the matching response policy.
542    pub(crate) fn for_command(cmd: &[u8]) -> Option<ResponsePolicy> {
543        match cmd {
544            b"SCRIPT EXISTS" => Some(ResponsePolicy::AggregateLogical(LogicalAggregateOp::And)),
545
546            b"DBSIZE" | b"DEL" | b"EXISTS" | b"SLOWLOG LEN" | b"TOUCH" | b"UNLINK"
547            | b"LATENCY RESET" | b"PUBSUB NUMPAT" => {
548                Some(ResponsePolicy::Aggregate(AggregateOp::Sum))
549            }
550
551            b"WAIT" => Some(ResponsePolicy::Aggregate(AggregateOp::Min)),
552
553            b"ACL SETUSER" | b"ACL DELUSER" | b"ACL SAVE" | b"CLIENT SETNAME"
554            | b"CLIENT SETINFO" | b"CONFIG SET" | b"CONFIG RESETSTAT" | b"CONFIG REWRITE"
555            | b"FLUSHALL" | b"FLUSHDB" | b"FUNCTION DELETE" | b"FUNCTION FLUSH"
556            | b"FUNCTION LOAD" | b"FUNCTION RESTORE" | b"MEMORY PURGE" | b"MSET" | b"JSON.MSET"
557            | b"PING" | b"SCRIPT FLUSH" | b"SCRIPT LOAD" | b"SLOWLOG RESET" | b"UNWATCH"
558            | b"WATCH" => Some(ResponsePolicy::AllSucceeded),
559
560            b"KEYS"
561            | b"FT._ALIASLIST"
562            | b"FT._LIST"
563            | b"MGET"
564            | b"JSON.MGET"
565            | b"SLOWLOG GET"
566            | b"PUBSUB CHANNELS"
567            | b"PUBSUB SHARDCHANNELS" => Some(ResponsePolicy::CombineArrays),
568
569            b"PUBSUB NUMSUB" | b"PUBSUB SHARDNUMSUB" => Some(ResponsePolicy::CombineMaps),
570
571            b"FUNCTION KILL" | b"SCRIPT KILL" => Some(ResponsePolicy::OneSucceeded),
572
573            // This isn't based on response_tips, but on the discussion here - https://github.com/redis/redis/issues/12410
574            b"RANDOMKEY" => Some(ResponsePolicy::FirstSucceededNonEmptyOrAllEmpty),
575
576            b"LATENCY GRAPH" | b"LATENCY HISTOGRAM" | b"LATENCY HISTORY" | b"LATENCY DOCTOR"
577            | b"LATENCY LATEST" => Some(ResponsePolicy::Special),
578
579            b"FUNCTION STATS" => Some(ResponsePolicy::Special),
580
581            b"MEMORY MALLOC-STATS" | b"MEMORY DOCTOR" | b"MEMORY STATS" => {
582                Some(ResponsePolicy::Special)
583            }
584
585            b"INFO" => Some(ResponsePolicy::Special),
586
587            _ => None,
588        }
589    }
590}
591
592enum RouteBy {
593    AllNodes,
594    AllPrimaries,
595    FirstKey,
596    MultiShard(MultiSlotArgPattern),
597    Random,
598    SecondArg,
599    SecondArgAfterKeyCount,
600    SecondArgSlot,
601    StreamsIndex,
602    ThirdArgAfterKeyCount,
603    Undefined,
604}
605
606fn base_routing(cmd: &[u8]) -> RouteBy {
607    match cmd {
608        b"ACL SETUSER"
609        | b"ACL DELUSER"
610        | b"ACL SAVE"
611        | b"CLIENT SETNAME"
612        | b"CLIENT SETINFO"
613        | b"SLOWLOG GET"
614        | b"SLOWLOG LEN"
615        | b"SLOWLOG RESET"
616        | b"CONFIG SET"
617        | b"CONFIG RESETSTAT"
618        | b"CONFIG REWRITE"
619        | b"SCRIPT FLUSH"
620        | b"SCRIPT LOAD"
621        | b"LATENCY RESET"
622        | b"LATENCY GRAPH"
623        | b"LATENCY HISTOGRAM"
624        | b"LATENCY HISTORY"
625        | b"LATENCY DOCTOR"
626        | b"LATENCY LATEST"
627        | b"PUBSUB NUMPAT"
628        | b"PUBSUB CHANNELS"
629        | b"PUBSUB NUMSUB"
630        | b"PUBSUB SHARDCHANNELS"
631        | b"PUBSUB SHARDNUMSUB"
632        | b"SCRIPT KILL"
633        | b"FUNCTION KILL"
634        | b"FUNCTION STATS" => RouteBy::AllNodes,
635
636        b"DBSIZE"
637        | b"DEBUG"
638        | b"FLUSHALL"
639        | b"FLUSHDB"
640        | b"FT._ALIASLIST"
641        | b"FT._LIST"
642        | b"FUNCTION DELETE"
643        | b"FUNCTION FLUSH"
644        | b"FUNCTION LOAD"
645        | b"FUNCTION RESTORE"
646        | b"INFO"
647        | b"KEYS"
648        | b"MEMORY DOCTOR"
649        | b"MEMORY MALLOC-STATS"
650        | b"MEMORY PURGE"
651        | b"MEMORY STATS"
652        | b"PING"
653        | b"SCRIPT EXISTS"
654        | b"UNWATCH"
655        | b"WAIT"
656        | b"RANDOMKEY"
657        | b"WAITAOF" => RouteBy::AllPrimaries,
658
659        b"MGET" | b"DEL" | b"EXISTS" | b"UNLINK" | b"TOUCH" | b"WATCH" => {
660            RouteBy::MultiShard(MultiSlotArgPattern::KeysOnly)
661        }
662
663        b"MSET" => RouteBy::MultiShard(MultiSlotArgPattern::KeyValuePairs),
664        b"JSON.MGET" => RouteBy::MultiShard(MultiSlotArgPattern::KeysAndLastArg),
665        b"JSON.MSET" => RouteBy::MultiShard(MultiSlotArgPattern::KeyWithTwoArgTriples),
666        // TODO - special handling - b"SCAN"
667        b"SCAN" | b"SHUTDOWN" | b"SLAVEOF" | b"REPLICAOF" => RouteBy::Undefined,
668
669        b"BLMPOP" | b"BZMPOP" | b"EVAL" | b"EVALSHA" | b"EVALSHA_RO" | b"EVAL_RO" | b"FCALL"
670        | b"FCALL_RO" => RouteBy::ThirdArgAfterKeyCount,
671
672        b"BITOP"
673        | b"MEMORY USAGE"
674        | b"PFDEBUG"
675        | b"XGROUP CREATE"
676        | b"XGROUP CREATECONSUMER"
677        | b"XGROUP DELCONSUMER"
678        | b"XGROUP DESTROY"
679        | b"XGROUP SETID"
680        | b"XINFO CONSUMERS"
681        | b"XINFO GROUPS"
682        | b"XINFO STREAM"
683        | b"OBJECT ENCODING"
684        | b"OBJECT FREQ"
685        | b"OBJECT IDLETIME"
686        | b"OBJECT REFCOUNT"
687        | b"JSON.DEBUG" => RouteBy::SecondArg,
688
689        b"LMPOP" | b"SINTERCARD" | b"ZDIFF" | b"ZINTER" | b"ZINTERCARD" | b"ZMPOP" | b"ZUNION" => {
690            RouteBy::SecondArgAfterKeyCount
691        }
692
693        b"XREAD" | b"XREADGROUP" => RouteBy::StreamsIndex,
694
695        // keyless commands with more arguments, whose arguments might be wrongly taken to be keys.
696        // TODO - double check these, in order to find better ways to route some of them.
697        b"ACL DRYRUN"
698        | b"ACL GENPASS"
699        | b"ACL GETUSER"
700        | b"ACL HELP"
701        | b"ACL LIST"
702        | b"ACL LOG"
703        | b"ACL USERS"
704        | b"ACL WHOAMI"
705        | b"AUTH"
706        | b"BGSAVE"
707        | b"CLIENT GETNAME"
708        | b"CLIENT GETREDIR"
709        | b"CLIENT ID"
710        | b"CLIENT INFO"
711        | b"CLIENT KILL"
712        | b"CLIENT PAUSE"
713        | b"CLIENT REPLY"
714        | b"CLIENT TRACKINGINFO"
715        | b"CLIENT UNBLOCK"
716        | b"CLIENT UNPAUSE"
717        | b"CLUSTER COUNT-FAILURE-REPORTS"
718        | b"CLUSTER INFO"
719        | b"CLUSTER KEYSLOT"
720        | b"CLUSTER MEET"
721        | b"CLUSTER MYSHARDID"
722        | b"CLUSTER NODES"
723        | b"CLUSTER REPLICAS"
724        | b"CLUSTER RESET"
725        | b"CLUSTER SET-CONFIG-EPOCH"
726        | b"CLUSTER SHARDS"
727        | b"CLUSTER SLOTS"
728        | b"COMMAND COUNT"
729        | b"COMMAND GETKEYS"
730        | b"COMMAND LIST"
731        | b"COMMAND"
732        | b"CONFIG GET"
733        | b"ECHO"
734        | b"FUNCTION LIST"
735        | b"LASTSAVE"
736        | b"LOLWUT"
737        | b"MODULE LIST"
738        | b"MODULE LOAD"
739        | b"MODULE LOADEX"
740        | b"MODULE UNLOAD"
741        | b"READONLY"
742        | b"READWRITE"
743        | b"SAVE"
744        | b"SCRIPT SHOW"
745        | b"TFCALL"
746        | b"TFCALLASYNC"
747        | b"TFUNCTION DELETE"
748        | b"TFUNCTION LIST"
749        | b"TFUNCTION LOAD"
750        | b"TIME" => RouteBy::Random,
751
752        b"CLUSTER ADDSLOTS"
753        | b"CLUSTER COUNTKEYSINSLOT"
754        | b"CLUSTER DELSLOTS"
755        | b"CLUSTER DELSLOTSRANGE"
756        | b"CLUSTER GETKEYSINSLOT"
757        | b"CLUSTER SETSLOT" => RouteBy::SecondArgSlot,
758
759        _ => RouteBy::FirstKey,
760    }
761}
762
763impl RoutingInfo {
764    /// Returns the routing info for `r`.
765    pub(crate) fn for_routable<R>(r: &R) -> Option<RoutingInfo>
766    where
767        R: Routable + ?Sized,
768    {
769        let cmd = &r.command()?[..];
770        match base_routing(cmd) {
771            RouteBy::AllNodes => Some(RoutingInfo::MultiNode((
772                MultipleNodeRoutingInfo::AllNodes,
773                ResponsePolicy::for_command(cmd),
774            ))),
775
776            RouteBy::AllPrimaries => Some(RoutingInfo::MultiNode((
777                MultipleNodeRoutingInfo::AllMasters,
778                ResponsePolicy::for_command(cmd),
779            ))),
780
781            RouteBy::MultiShard(arg_pattern) => multi_shard(r, cmd, 1, arg_pattern),
782
783            RouteBy::Random => Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)),
784
785            RouteBy::ThirdArgAfterKeyCount => {
786                let key_count = r
787                    .arg_idx(2)
788                    .and_then(|x| std::str::from_utf8(x).ok())
789                    .and_then(|x| x.parse::<u64>().ok())?;
790                if key_count == 0 {
791                    Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
792                } else {
793                    r.arg_idx(3).map(|key| RoutingInfo::for_key(cmd, key))
794                }
795            }
796
797            RouteBy::SecondArg => r.arg_idx(2).map(|key| RoutingInfo::for_key(cmd, key)),
798
799            RouteBy::SecondArgAfterKeyCount => {
800                let key_count = r
801                    .arg_idx(1)
802                    .and_then(|x| std::str::from_utf8(x).ok())
803                    .and_then(|x| x.parse::<u64>().ok())?;
804                if key_count == 0 {
805                    Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
806                } else {
807                    r.arg_idx(2).map(|key| RoutingInfo::for_key(cmd, key))
808                }
809            }
810
811            RouteBy::StreamsIndex => {
812                let streams_position = r.position(b"STREAMS")?;
813                r.arg_idx(streams_position + 1)
814                    .map(|key| RoutingInfo::for_key(cmd, key))
815            }
816
817            RouteBy::SecondArgSlot => r
818                .arg_idx(2)
819                .and_then(|arg| std::str::from_utf8(arg).ok())
820                .and_then(|slot| slot.parse::<u16>().ok())
821                .map(|slot| {
822                    RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route::new(
823                        slot,
824                        SlotAddr::Master,
825                    )))
826                }),
827
828            RouteBy::FirstKey => match r.arg_idx(1) {
829                Some(key) => Some(RoutingInfo::for_key(cmd, key)),
830                None => Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)),
831            },
832
833            RouteBy::Undefined => None,
834        }
835    }
836
837    fn for_key(cmd: &[u8], key: &[u8]) -> RoutingInfo {
838        RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(get_route(
839            is_readonly_cmd(cmd),
840            key,
841        )))
842    }
843}
844
845/// Objects that implement this trait define a request that can be routed by a cluster client to different nodes in the cluster.
846pub(crate) trait Routable {
847    /// Convenience function to return ascii uppercase version of the
848    /// the first argument (i.e., the command).
849    fn command(&self) -> Option<Vec<u8>> {
850        let primary_command = self.arg_idx(0).map(|x| x.to_ascii_uppercase())?;
851        let mut primary_command = match primary_command.as_slice() {
852            b"XGROUP" | b"OBJECT" | b"SLOWLOG" | b"FUNCTION" | b"MODULE" | b"COMMAND"
853            | b"PUBSUB" | b"CONFIG" | b"MEMORY" | b"XINFO" | b"CLIENT" | b"ACL" | b"SCRIPT"
854            | b"CLUSTER" | b"LATENCY" => primary_command,
855            _ => {
856                return Some(primary_command);
857            }
858        };
859
860        Some(match self.arg_idx(1) {
861            Some(secondary_command) => {
862                let previous_len = primary_command.len();
863                primary_command.reserve(secondary_command.len() + 1);
864                primary_command.extend(b" ");
865                primary_command.extend(secondary_command);
866                let current_len = primary_command.len();
867                primary_command[previous_len + 1..current_len].make_ascii_uppercase();
868                primary_command
869            }
870            None => primary_command,
871        })
872    }
873
874    /// Returns a reference to the data for the argument at `idx`.
875    fn arg_idx(&self, idx: usize) -> Option<&[u8]>;
876
877    /// Returns index of argument that matches `candidate`, if it exists
878    fn position(&self, candidate: &[u8]) -> Option<usize>;
879}
880
881impl Routable for Cmd {
882    fn arg_idx(&self, idx: usize) -> Option<&[u8]> {
883        self.arg_idx(idx)
884    }
885
886    fn position(&self, candidate: &[u8]) -> Option<usize> {
887        self.args_iter().position(|a| match a {
888            Arg::Simple(d) => d.eq_ignore_ascii_case(candidate),
889            _ => false,
890        })
891    }
892}
893
894impl Routable for Value {
895    fn arg_idx(&self, idx: usize) -> Option<&[u8]> {
896        match self {
897            Value::Array(args) => match args.get(idx) {
898                Some(Value::BulkString(ref data)) => Some(&data[..]),
899                _ => None,
900            },
901            _ => None,
902        }
903    }
904
905    fn position(&self, candidate: &[u8]) -> Option<usize> {
906        match self {
907            Value::Array(args) => args.iter().position(|a| match a {
908                Value::BulkString(d) => d.eq_ignore_ascii_case(candidate),
909                _ => false,
910            }),
911            _ => None,
912        }
913    }
914}
915
916#[derive(Debug, Hash, Clone)]
917pub(crate) struct Slot {
918    pub(crate) start: u16,
919    pub(crate) end: u16,
920    pub(crate) master: String,
921    pub(crate) replicas: Vec<String>,
922}
923
924impl Slot {
925    #[allow(dead_code)] // used in tests
926    pub(crate) fn master(&self) -> &str {
927        self.master.as_str()
928    }
929
930    #[allow(dead_code)] // used in tests
931    pub(crate) fn replicas(&self) -> Vec<String> {
932        self.replicas.clone()
933    }
934}
935
936/// What type of node should a request be routed to, assuming read from replica is enabled.
937#[derive(Eq, PartialEq, Clone, Copy, Debug, Hash)]
938pub enum SlotAddr {
939    /// The request must be routed to primary node
940    Master,
941    /// The request may be routed to a replica node.
942    /// For example, a GET command can be routed either to replica or primary.
943    ReplicaOptional,
944    /// The request must be routed to replica node, if one exists.
945    /// For example, by user requested routing.
946    ReplicaRequired,
947}
948
949/// Defines the slot and the [`SlotAddr`] to which
950/// a command should be sent
951#[derive(Eq, PartialEq, Clone, Copy, Debug, Hash)]
952pub struct Route(u16, SlotAddr);
953
954impl Route {
955    /// Returns a new Route.
956    pub fn new(slot: u16, slot_addr: SlotAddr) -> Self {
957        Self(slot, slot_addr)
958    }
959
960    /// Returns the slot number of the route.
961    pub(crate) fn slot(&self) -> u16 {
962        self.0
963    }
964
965    /// Returns the slot address of the route.
966    pub(crate) fn slot_addr(&self) -> SlotAddr {
967        self.1
968    }
969
970    /// Returns a new Route for a random primary node
971    pub(crate) fn new_random_primary() -> Self {
972        Self::new(random_slot(), SlotAddr::Master)
973    }
974}
975
976/// Choose a random slot from `0..SLOT_SIZE` (excluding)
977fn random_slot() -> u16 {
978    let mut rng = rand::rng();
979    rng.random_range(0..SLOT_SIZE)
980}
981
982fn get_hashtag(key: &[u8]) -> Option<&[u8]> {
983    let open = key.iter().position(|v| *v == b'{')?;
984
985    let close = key[open..].iter().position(|v| *v == b'}')?;
986
987    let rv = &key[open + 1..open + close];
988    (!rv.is_empty()).then_some(rv)
989}
990
991#[cfg(test)]
992mod tests_routing {
993    use super::{
994        command_for_multi_slot_indices, AggregateOp, MultiSlotArgPattern, MultipleNodeRoutingInfo,
995        ResponsePolicy, Route, RoutingInfo, SingleNodeRoutingInfo, SlotAddr,
996    };
997    use crate::cluster_routing::slot;
998    use crate::{cmd, parser::parse_redis_value, Value};
999    use core::panic;
1000
1001    #[test]
1002    fn test_routing_info_mixed_capatalization() {
1003        let mut upper = cmd("XREAD");
1004        upper.arg("STREAMS").arg("foo").arg(0);
1005
1006        let mut lower = cmd("xread");
1007        lower.arg("streams").arg("foo").arg(0);
1008
1009        assert_eq!(
1010            RoutingInfo::for_routable(&upper).unwrap(),
1011            RoutingInfo::for_routable(&lower).unwrap()
1012        );
1013
1014        let mut mixed = cmd("xReAd");
1015        mixed.arg("StReAmS").arg("foo").arg(0);
1016
1017        assert_eq!(
1018            RoutingInfo::for_routable(&lower).unwrap(),
1019            RoutingInfo::for_routable(&mixed).unwrap()
1020        );
1021    }
1022
1023    #[test]
1024    fn test_routing_info() {
1025        let mut test_cmds = vec![];
1026
1027        // RoutingInfo::AllMasters
1028        let mut test_cmd = cmd("FLUSHALL");
1029        test_cmd.arg("");
1030        test_cmds.push(test_cmd);
1031
1032        // RoutingInfo::AllNodes
1033        test_cmd = cmd("ECHO");
1034        test_cmd.arg("");
1035        test_cmds.push(test_cmd);
1036
1037        // Routing key is 2nd arg ("42")
1038        test_cmd = cmd("SET");
1039        test_cmd.arg("42");
1040        test_cmds.push(test_cmd);
1041
1042        // Routing key is 3rd arg ("FOOBAR")
1043        test_cmd = cmd("XINFO");
1044        test_cmd.arg("GROUPS").arg("FOOBAR");
1045        test_cmds.push(test_cmd);
1046
1047        // Routing key is 3rd or 4th arg (3rd = "0" == RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
1048        test_cmd = cmd("EVAL");
1049        test_cmd.arg("FOO").arg("0").arg("BAR");
1050        test_cmds.push(test_cmd);
1051
1052        // Routing key is 3rd or 4th arg (3rd != "0" == RoutingInfo::Slot)
1053        test_cmd = cmd("EVAL");
1054        test_cmd.arg("FOO").arg("4").arg("BAR");
1055        test_cmds.push(test_cmd);
1056
1057        // Routing key position is variable, 3rd arg
1058        test_cmd = cmd("XREAD");
1059        test_cmd.arg("STREAMS").arg("4");
1060        test_cmds.push(test_cmd);
1061
1062        // Routing key position is variable, 4th arg
1063        test_cmd = cmd("XREAD");
1064        test_cmd.arg("FOO").arg("STREAMS").arg("4");
1065        test_cmds.push(test_cmd);
1066
1067        for cmd in test_cmds {
1068            let value = parse_redis_value(&cmd.get_packed_command()).unwrap();
1069            assert_eq!(
1070                RoutingInfo::for_routable(&value).unwrap(),
1071                RoutingInfo::for_routable(&cmd).unwrap(),
1072            );
1073        }
1074
1075        // Assert expected RoutingInfo explicitly:
1076
1077        for cmd in [cmd("FLUSHALL"), cmd("FLUSHDB"), cmd("PING")] {
1078            assert_eq!(
1079                RoutingInfo::for_routable(&cmd),
1080                Some(RoutingInfo::MultiNode((
1081                    MultipleNodeRoutingInfo::AllMasters,
1082                    Some(ResponsePolicy::AllSucceeded)
1083                )))
1084            );
1085        }
1086
1087        assert_eq!(
1088            RoutingInfo::for_routable(&cmd("DBSIZE")),
1089            Some(RoutingInfo::MultiNode((
1090                MultipleNodeRoutingInfo::AllMasters,
1091                Some(ResponsePolicy::Aggregate(AggregateOp::Sum))
1092            )))
1093        );
1094
1095        assert_eq!(
1096            RoutingInfo::for_routable(&cmd("SCRIPT KILL")),
1097            Some(RoutingInfo::MultiNode((
1098                MultipleNodeRoutingInfo::AllNodes,
1099                Some(ResponsePolicy::OneSucceeded)
1100            )))
1101        );
1102
1103        assert_eq!(
1104            RoutingInfo::for_routable(&cmd("INFO")),
1105            Some(RoutingInfo::MultiNode((
1106                MultipleNodeRoutingInfo::AllMasters,
1107                Some(ResponsePolicy::Special)
1108            )))
1109        );
1110
1111        assert_eq!(
1112            RoutingInfo::for_routable(&cmd("KEYS")),
1113            Some(RoutingInfo::MultiNode((
1114                MultipleNodeRoutingInfo::AllMasters,
1115                Some(ResponsePolicy::CombineArrays)
1116            )))
1117        );
1118
1119        for cmd in vec![
1120            cmd("SCAN"),
1121            cmd("SHUTDOWN"),
1122            cmd("SLAVEOF"),
1123            cmd("REPLICAOF"),
1124        ] {
1125            assert_eq!(
1126                RoutingInfo::for_routable(&cmd),
1127                None,
1128                "{}",
1129                std::str::from_utf8(cmd.arg_idx(0).unwrap()).unwrap()
1130            );
1131        }
1132
1133        for cmd in [
1134            cmd("EVAL").arg(r#"redis.call("PING");"#).arg(0),
1135            cmd("EVALSHA").arg(r#"redis.call("PING");"#).arg(0),
1136        ] {
1137            assert_eq!(
1138                RoutingInfo::for_routable(cmd),
1139                Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
1140            );
1141        }
1142
1143        // While FCALL with N keys is expected to be routed to a specific node
1144        assert_eq!(
1145            RoutingInfo::for_routable(cmd("FCALL").arg("foo").arg(1).arg("mykey")),
1146            Some(RoutingInfo::SingleNode(
1147                SingleNodeRoutingInfo::SpecificNode(Route::new(slot(b"mykey"), SlotAddr::Master))
1148            ))
1149        );
1150
1151        for (cmd, expected) in [
1152            (
1153                cmd("EVAL")
1154                    .arg(r#"redis.call("GET, KEYS[1]");"#)
1155                    .arg(1)
1156                    .arg("foo"),
1157                Some(RoutingInfo::SingleNode(
1158                    SingleNodeRoutingInfo::SpecificNode(Route::new(slot(b"foo"), SlotAddr::Master)),
1159                )),
1160            ),
1161            (
1162                cmd("XGROUP")
1163                    .arg("CREATE")
1164                    .arg("mystream")
1165                    .arg("workers")
1166                    .arg("$")
1167                    .arg("MKSTREAM"),
1168                Some(RoutingInfo::SingleNode(
1169                    SingleNodeRoutingInfo::SpecificNode(Route::new(
1170                        slot(b"mystream"),
1171                        SlotAddr::Master,
1172                    )),
1173                )),
1174            ),
1175            (
1176                cmd("XINFO").arg("GROUPS").arg("foo"),
1177                Some(RoutingInfo::SingleNode(
1178                    SingleNodeRoutingInfo::SpecificNode(Route::new(
1179                        slot(b"foo"),
1180                        SlotAddr::ReplicaOptional,
1181                    )),
1182                )),
1183            ),
1184            (
1185                cmd("XREADGROUP")
1186                    .arg("GROUP")
1187                    .arg("wkrs")
1188                    .arg("consmrs")
1189                    .arg("STREAMS")
1190                    .arg("mystream"),
1191                Some(RoutingInfo::SingleNode(
1192                    SingleNodeRoutingInfo::SpecificNode(Route::new(
1193                        slot(b"mystream"),
1194                        SlotAddr::Master,
1195                    )),
1196                )),
1197            ),
1198            (
1199                cmd("XREAD")
1200                    .arg("COUNT")
1201                    .arg("2")
1202                    .arg("STREAMS")
1203                    .arg("mystream")
1204                    .arg("writers")
1205                    .arg("0-0")
1206                    .arg("0-0"),
1207                Some(RoutingInfo::SingleNode(
1208                    SingleNodeRoutingInfo::SpecificNode(Route::new(
1209                        slot(b"mystream"),
1210                        SlotAddr::ReplicaOptional,
1211                    )),
1212                )),
1213            ),
1214        ] {
1215            assert_eq!(
1216                RoutingInfo::for_routable(cmd),
1217                expected,
1218                "{}",
1219                std::str::from_utf8(cmd.arg_idx(0).unwrap()).unwrap()
1220            );
1221        }
1222    }
1223
1224    #[test]
1225    fn test_slot_for_packed_cmd() {
1226        assert!(matches!(RoutingInfo::for_routable(&parse_redis_value(&[
1227                42, 50, 13, 10, 36, 54, 13, 10, 69, 88, 73, 83, 84, 83, 13, 10, 36, 49, 54, 13, 10,
1228                244, 93, 23, 40, 126, 127, 253, 33, 89, 47, 185, 204, 171, 249, 96, 139, 13, 10
1229            ]).unwrap()), Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route(slot, SlotAddr::ReplicaOptional)))) if slot == 964));
1230
1231        assert!(matches!(RoutingInfo::for_routable(&parse_redis_value(&[
1232                42, 54, 13, 10, 36, 51, 13, 10, 83, 69, 84, 13, 10, 36, 49, 54, 13, 10, 36, 241,
1233                197, 111, 180, 254, 5, 175, 143, 146, 171, 39, 172, 23, 164, 145, 13, 10, 36, 52,
1234                13, 10, 116, 114, 117, 101, 13, 10, 36, 50, 13, 10, 78, 88, 13, 10, 36, 50, 13, 10,
1235                80, 88, 13, 10, 36, 55, 13, 10, 49, 56, 48, 48, 48, 48, 48, 13, 10
1236            ]).unwrap()), Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route(slot, SlotAddr::Master)))) if slot == 8352));
1237
1238        assert!(matches!(RoutingInfo::for_routable(&parse_redis_value(&[
1239                42, 54, 13, 10, 36, 51, 13, 10, 83, 69, 84, 13, 10, 36, 49, 54, 13, 10, 169, 233,
1240                247, 59, 50, 247, 100, 232, 123, 140, 2, 101, 125, 221, 66, 170, 13, 10, 36, 52,
1241                13, 10, 116, 114, 117, 101, 13, 10, 36, 50, 13, 10, 78, 88, 13, 10, 36, 50, 13, 10,
1242                80, 88, 13, 10, 36, 55, 13, 10, 49, 56, 48, 48, 48, 48, 48, 13, 10
1243            ]).unwrap()), Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route(slot, SlotAddr::Master)))) if slot == 5210));
1244    }
1245
1246    #[test]
1247    fn test_multi_shard_keys_only() {
1248        let mut cmd = cmd("DEL");
1249        cmd.arg("foo").arg("bar").arg("baz").arg("{bar}vaz");
1250        let routing = RoutingInfo::for_routable(&cmd);
1251        let mut expected = std::collections::HashMap::new();
1252        expected.insert(Route(4813, SlotAddr::Master), vec![2]);
1253        expected.insert(Route(5061, SlotAddr::Master), vec![1, 3]);
1254        expected.insert(Route(12182, SlotAddr::Master), vec![0]);
1255
1256        assert!(
1257            matches!(routing.clone(), Some(RoutingInfo::MultiNode((MultipleNodeRoutingInfo::MultiSlot((vec, args_pattern)), Some(ResponsePolicy::Aggregate(AggregateOp::Sum))))) if {
1258                let routes = vec.clone().into_iter().collect();
1259                expected == routes && args_pattern == MultiSlotArgPattern::KeysOnly
1260            }),
1261            "expected={expected:?}\nrouting={routing:?}"
1262        );
1263
1264        let mut cmd = crate::cmd("MGET");
1265        cmd.arg("foo").arg("bar").arg("baz").arg("{bar}vaz");
1266        let routing = RoutingInfo::for_routable(&cmd);
1267        let mut expected = std::collections::HashMap::new();
1268        expected.insert(Route(4813, SlotAddr::ReplicaOptional), vec![2]);
1269        expected.insert(Route(5061, SlotAddr::ReplicaOptional), vec![1, 3]);
1270        expected.insert(Route(12182, SlotAddr::ReplicaOptional), vec![0]);
1271
1272        assert!(
1273            matches!(routing.clone(), Some(RoutingInfo::MultiNode((MultipleNodeRoutingInfo::MultiSlot((vec, args_pattern)), Some(ResponsePolicy::CombineArrays)))) if {
1274                let routes = vec.clone().into_iter().collect();
1275                expected == routes && args_pattern == MultiSlotArgPattern::KeysOnly
1276            }),
1277            "expected={expected:?}\nrouting={routing:?}"
1278        );
1279    }
1280
1281    #[test]
1282    fn test_multi_shard_key_value_pairs() {
1283        let mut cmd = cmd("MSET");
1284        cmd.arg("foo") // key slot 12182
1285            .arg("bar") // value
1286            .arg("foo2") // key slot 1044
1287            .arg("bar2")    // value
1288            .arg("{foo}foo3") // key slot 12182
1289            .arg("bar3"); // value
1290        let routing = RoutingInfo::for_routable(&cmd);
1291        let mut expected = std::collections::HashMap::new();
1292        expected.insert(Route(1044, SlotAddr::Master), vec![2, 3]);
1293        expected.insert(Route(12182, SlotAddr::Master), vec![0, 1, 4, 5]);
1294
1295        assert!(
1296            matches!(routing.clone(), Some(RoutingInfo::MultiNode((MultipleNodeRoutingInfo::MultiSlot((vec, args_pattern)), Some(ResponsePolicy::AllSucceeded)))) if {
1297                let routes = vec.clone().into_iter().collect();
1298                expected == routes && args_pattern == MultiSlotArgPattern::KeyValuePairs
1299            }),
1300            "expected={expected:?}\nrouting={routing:?}"
1301        );
1302    }
1303
1304    #[test]
1305    fn test_multi_shard_keys_and_path() {
1306        let mut cmd = cmd("JSON.MGET");
1307        cmd.arg("foo") // key slot 12182
1308            .arg("bar") // key slot 5061
1309            .arg("baz") // key slot 4813
1310            .arg("{bar}vaz") // key slot 5061
1311            .arg("$.f.a"); // path
1312        let routing = RoutingInfo::for_routable(&cmd);
1313        let mut expected = std::collections::HashMap::new();
1314        expected.insert(Route(4813, SlotAddr::ReplicaOptional), vec![2, 4]);
1315        expected.insert(Route(5061, SlotAddr::ReplicaOptional), vec![1, 3, 4]);
1316        expected.insert(Route(12182, SlotAddr::ReplicaOptional), vec![0, 4]);
1317
1318        assert!(
1319            matches!(routing.clone(), Some(RoutingInfo::MultiNode((MultipleNodeRoutingInfo::MultiSlot((vec, args_pattern)), Some(ResponsePolicy::CombineArrays)))) if {
1320                let routes = vec.clone().into_iter().collect();
1321                expected == routes && args_pattern == MultiSlotArgPattern::KeysAndLastArg
1322            }),
1323            "expected={expected:?}\nrouting={routing:?}"
1324        );
1325    }
1326
1327    #[test]
1328    fn test_multi_shard_key_with_two_arg_triples() {
1329        let mut cmd = cmd("JSON.MSET");
1330        cmd
1331            .arg("foo") // key slot 12182
1332            .arg("$.a") // path
1333            .arg("bar") // value
1334            .arg("foo2") // key slot 1044
1335            .arg("$.f.a") // path
1336            .arg("bar2") // value
1337            .arg("{foo}foo3") // key slot 12182
1338            .arg("$.f.a") // path
1339            .arg("bar3"); // value
1340        let routing = RoutingInfo::for_routable(&cmd);
1341        let mut expected = std::collections::HashMap::new();
1342        expected.insert(Route(1044, SlotAddr::Master), vec![3, 4, 5]);
1343        expected.insert(Route(12182, SlotAddr::Master), vec![0, 1, 2, 6, 7, 8]);
1344
1345        assert!(
1346            matches!(routing.clone(), Some(RoutingInfo::MultiNode((MultipleNodeRoutingInfo::MultiSlot((vec, args_pattern)), Some(ResponsePolicy::AllSucceeded)))) if {
1347                let routes = vec.clone().into_iter().collect();
1348                expected == routes && args_pattern == MultiSlotArgPattern::KeyWithTwoArgTriples
1349            }),
1350            "expected={expected:?}\nrouting={routing:?}"
1351        );
1352    }
1353
1354    #[test]
1355    fn test_command_creation_for_multi_shard() {
1356        let mut original_cmd = cmd("DEL");
1357        original_cmd
1358            .arg("foo")
1359            .arg("bar")
1360            .arg("baz")
1361            .arg("{bar}vaz");
1362        let routing = RoutingInfo::for_routable(&original_cmd);
1363        let expected = [vec![0], vec![1, 3], vec![2]];
1364
1365        let mut indices: Vec<_> = match routing {
1366            Some(RoutingInfo::MultiNode((
1367                MultipleNodeRoutingInfo::MultiSlot((vec, MultiSlotArgPattern::KeysOnly)),
1368                _,
1369            ))) => vec.into_iter().map(|(_, indices)| indices).collect(),
1370            _ => panic!("unexpected routing: {routing:?}"),
1371        };
1372        indices.sort_by(|prev, next| prev.iter().next().unwrap().cmp(next.iter().next().unwrap())); // sorting because the `for_routable` doesn't return values in a consistent order between runs.
1373
1374        for (index, indices) in indices.into_iter().enumerate() {
1375            let cmd = command_for_multi_slot_indices(&original_cmd, indices.iter());
1376            let expected_indices = &expected[index];
1377            assert_eq!(original_cmd.arg_idx(0), cmd.arg_idx(0));
1378            for (index, target_index) in expected_indices.iter().enumerate() {
1379                let target_index = target_index + 1;
1380                assert_eq!(original_cmd.arg_idx(target_index), cmd.arg_idx(index + 1));
1381            }
1382        }
1383    }
1384
1385    #[test]
1386    fn test_combine_multi_shard_to_single_node_when_all_keys_are_in_same_slot() {
1387        let mut cmd = cmd("DEL");
1388        cmd.arg("foo").arg("{foo}bar").arg("{foo}baz");
1389        let routing = RoutingInfo::for_routable(&cmd);
1390
1391        assert!(
1392            matches!(
1393                routing,
1394                Some(RoutingInfo::SingleNode(
1395                    SingleNodeRoutingInfo::SpecificNode(Route(12182, SlotAddr::Master))
1396                ))
1397            ),
1398            "{routing:?}"
1399        );
1400    }
1401
1402    #[test]
1403    fn test_combining_results_into_single_array_only_keys() {
1404        // For example `MGET foo bar baz {baz}baz2 {bar}bar2 {foo}foo2`
1405        let res1 = Value::Array(vec![Value::Nil, Value::Okay]);
1406        let res2 = Value::Array(vec![
1407            Value::BulkString("1".as_bytes().to_vec()),
1408            Value::BulkString("4".as_bytes().to_vec()),
1409        ]);
1410        let res3 = Value::Array(vec![Value::SimpleString("2".to_string()), Value::Int(3)]);
1411        let results = super::combine_and_sort_array_results(
1412            vec![res1, res2, res3],
1413            &[
1414                (Route(4813, SlotAddr::Master), vec![2, 3]),
1415                (Route(5061, SlotAddr::Master), vec![1, 4]),
1416                (Route(12182, SlotAddr::Master), vec![0, 5]),
1417            ],
1418            &MultiSlotArgPattern::KeysOnly,
1419        );
1420
1421        assert_eq!(
1422            results.unwrap(),
1423            Value::Array(vec![
1424                Value::SimpleString("2".to_string()),
1425                Value::BulkString("1".as_bytes().to_vec()),
1426                Value::Nil,
1427                Value::Okay,
1428                Value::BulkString("4".as_bytes().to_vec()),
1429                Value::Int(3),
1430            ])
1431        );
1432    }
1433
1434    #[test]
1435    fn test_combining_results_into_single_array_key_value_paires() {
1436        // For example `MSET foo bar foo2 bar2 {foo}foo3 bar3`
1437        let res1 = Value::Array(vec![Value::Okay]);
1438        let res2 = Value::Array(vec![Value::BulkString("1".as_bytes().to_vec()), Value::Nil]);
1439        let results = super::combine_and_sort_array_results(
1440            vec![res1, res2],
1441            &[
1442                (Route(1044, SlotAddr::Master), vec![2, 3]),
1443                (Route(12182, SlotAddr::Master), vec![0, 1, 4, 5]),
1444            ],
1445            &MultiSlotArgPattern::KeyValuePairs,
1446        );
1447
1448        assert_eq!(
1449            results.unwrap(),
1450            Value::Array(vec![
1451                Value::BulkString("1".as_bytes().to_vec()),
1452                Value::Okay,
1453                Value::Nil
1454            ])
1455        );
1456    }
1457
1458    #[test]
1459    fn test_combining_results_into_single_array_keys_and_path() {
1460        // For example `JSON.MGET foo bar {foo}foo2 $.a`
1461        let res1 = Value::Array(vec![Value::Okay]);
1462        let res2 = Value::Array(vec![Value::BulkString("1".as_bytes().to_vec()), Value::Nil]);
1463        let results = super::combine_and_sort_array_results(
1464            vec![res1, res2],
1465            &[
1466                (Route(5061, SlotAddr::Master), vec![2, 3]),
1467                (Route(12182, SlotAddr::Master), vec![0, 1, 3]),
1468            ],
1469            &MultiSlotArgPattern::KeysAndLastArg,
1470        );
1471
1472        assert_eq!(
1473            results.unwrap(),
1474            Value::Array(vec![
1475                Value::BulkString("1".as_bytes().to_vec()),
1476                Value::Nil,
1477                Value::Okay,
1478            ])
1479        );
1480    }
1481
1482    #[test]
1483    fn test_combining_results_into_single_array_key_with_two_arg_triples() {
1484        // For example `JSON.MSET foo $.a bar foo2 $.f.a bar2 {foo}foo3 $.f bar3`
1485        let res1 = Value::Array(vec![Value::Okay]);
1486        let res2 = Value::Array(vec![Value::BulkString("1".as_bytes().to_vec()), Value::Nil]);
1487        let results = super::combine_and_sort_array_results(
1488            vec![res1, res2],
1489            &[
1490                (Route(5061, SlotAddr::Master), vec![3, 4, 5]),
1491                (Route(12182, SlotAddr::Master), vec![0, 1, 2, 6, 7, 8]),
1492            ],
1493            &MultiSlotArgPattern::KeyWithTwoArgTriples,
1494        );
1495
1496        assert_eq!(
1497            results.unwrap(),
1498            Value::Array(vec![
1499                Value::BulkString("1".as_bytes().to_vec()),
1500                Value::Okay,
1501                Value::Nil
1502            ])
1503        );
1504    }
1505
1506    #[test]
1507    fn test_combine_map_results() {
1508        let input = vec![];
1509        let result = super::combine_map_results(input).unwrap();
1510        assert_eq!(result, Value::Map(vec![]));
1511
1512        let input = vec![
1513            Value::Array(vec![
1514                Value::BulkString(b"key1".to_vec()),
1515                Value::Int(5),
1516                Value::BulkString(b"key2".to_vec()),
1517                Value::Int(10),
1518            ]),
1519            Value::Array(vec![
1520                Value::BulkString(b"key1".to_vec()),
1521                Value::Int(3),
1522                Value::BulkString(b"key3".to_vec()),
1523                Value::Int(15),
1524            ]),
1525        ];
1526        let result = super::combine_map_results(input).unwrap();
1527        let mut expected = vec![
1528            (Value::BulkString(b"key1".to_vec()), Value::Int(8)),
1529            (Value::BulkString(b"key2".to_vec()), Value::Int(10)),
1530            (Value::BulkString(b"key3".to_vec()), Value::Int(15)),
1531        ];
1532        expected.sort_unstable_by(|a, b| match (&a.0, &b.0) {
1533            (Value::BulkString(a_bytes), Value::BulkString(b_bytes)) => a_bytes.cmp(b_bytes),
1534            _ => std::cmp::Ordering::Equal,
1535        });
1536        let mut result_vec = match result {
1537            Value::Map(v) => v,
1538            _ => panic!("Expected Map"),
1539        };
1540        result_vec.sort_unstable_by(|a, b| match (&a.0, &b.0) {
1541            (Value::BulkString(a_bytes), Value::BulkString(b_bytes)) => a_bytes.cmp(b_bytes),
1542            _ => std::cmp::Ordering::Equal,
1543        });
1544        assert_eq!(result_vec, expected);
1545
1546        let input = vec![Value::Int(5)];
1547        let result = super::combine_map_results(input);
1548        assert!(result.is_err());
1549    }
1550}