1use rand::Rng;
2
3use crate::cluster_handling::slot_map::SLOT_SIZE;
4use crate::cmd::{Arg, Cmd};
5use crate::commands::is_readonly_cmd;
6use crate::types::Value;
7use crate::{ErrorKind, RedisError, RedisResult};
8use std::borrow::Cow;
9use std::cmp::min;
10use std::collections::HashMap;
11
12fn slot(key: &[u8]) -> u16 {
13 crc16::State::<crc16::XMODEM>::calculate(key) % SLOT_SIZE
14}
15
16pub(crate) fn get_slot(key: &[u8]) -> u16 {
18 let key = match get_hashtag(key) {
19 Some(tag) => tag,
20 None => key,
21 };
22
23 slot(key)
24}
25
26#[derive(Clone, PartialEq, Debug)]
27pub(crate) enum Redirect {
28 Moved(String),
29 Ask(String),
30}
31
32#[derive(Debug, Clone, Copy, PartialEq)]
34pub enum LogicalAggregateOp {
35 And,
37 }
39
40#[derive(Debug, Clone, Copy, PartialEq)]
42pub enum AggregateOp {
43 Min,
45 Sum,
47 }
49
50#[derive(Debug, Clone, Copy, PartialEq)]
52pub enum ResponsePolicy {
53 OneSucceeded,
55 FirstSucceededNonEmptyOrAllEmpty,
57 AllSucceeded,
59 AggregateLogical(LogicalAggregateOp),
61 Aggregate(AggregateOp),
63 CombineArrays,
65 Special,
67 CombineMaps,
69}
70
71#[derive(Debug, Clone, PartialEq)]
73pub enum RoutingInfo {
74 SingleNode(SingleNodeRoutingInfo),
76 MultiNode((MultipleNodeRoutingInfo, Option<ResponsePolicy>)),
78}
79
80#[derive(Debug, Clone, PartialEq)]
82pub enum SingleNodeRoutingInfo {
83 Random,
85 RandomPrimary,
87 SpecificNode(Route),
89 ByAddress {
91 host: String,
93 port: u16,
95 },
96}
97
98impl From<Option<Route>> for SingleNodeRoutingInfo {
99 fn from(value: Option<Route>) -> Self {
100 value
101 .map(SingleNodeRoutingInfo::SpecificNode)
102 .unwrap_or(SingleNodeRoutingInfo::Random)
103 }
104}
105
106#[derive(Debug, Clone, PartialEq)]
108pub enum MultipleNodeRoutingInfo {
109 AllNodes,
111 AllMasters,
113 MultiSlot((Vec<(Route, Vec<usize>)>, MultiSlotArgPattern)),
120}
121
122pub(crate) fn command_for_multi_slot_indices<'a, 'b>(
125 original_cmd: &'a impl Routable,
126 indices: impl Iterator<Item = &'b usize> + 'a,
127) -> Cmd
128where
129 'b: 'a,
130{
131 let mut new_cmd = Cmd::new();
132 let command_length = 1; new_cmd.arg(original_cmd.arg_idx(0));
134 for index in indices {
135 new_cmd.arg(original_cmd.arg_idx(index + command_length));
136 }
137 new_cmd
138}
139
140pub(crate) fn aggregate(values: Vec<Value>, op: AggregateOp) -> RedisResult<Value> {
142 let initial_value = match op {
143 AggregateOp::Min => i64::MAX,
144 AggregateOp::Sum => 0,
145 };
146 let result = values.into_iter().try_fold(initial_value, |acc, curr| {
147 let int = match curr {
148 Value::Int(int) => int,
149 _ => {
150 return RedisResult::Err(
151 (
152 ErrorKind::TypeError,
153 "expected array of integers as response",
154 )
155 .into(),
156 );
157 }
158 };
159 let acc = match op {
160 AggregateOp::Min => min(acc, int),
161 AggregateOp::Sum => acc + int,
162 };
163 Ok(acc)
164 })?;
165 Ok(Value::Int(result))
166}
167
168pub(crate) fn logical_aggregate(values: Vec<Value>, op: LogicalAggregateOp) -> RedisResult<Value> {
170 let initial_value = match op {
171 LogicalAggregateOp::And => true,
172 };
173 let results = values.into_iter().try_fold(Vec::new(), |acc, curr| {
174 let values = match curr {
175 Value::Array(values) => values,
176 _ => {
177 return RedisResult::Err(
178 (
179 ErrorKind::TypeError,
180 "expected array of integers as response",
181 )
182 .into(),
183 );
184 }
185 };
186 let mut acc = if acc.is_empty() {
187 vec![initial_value; values.len()]
188 } else {
189 acc
190 };
191 for (index, value) in values.into_iter().enumerate() {
192 let int = match value {
193 Value::Int(int) => int,
194 _ => {
195 return Err((
196 ErrorKind::TypeError,
197 "expected array of integers as response",
198 )
199 .into());
200 }
201 };
202 acc[index] = match op {
203 LogicalAggregateOp::And => acc[index] && (int > 0),
204 };
205 }
206 Ok(acc)
207 })?;
208 Ok(Value::Array(
209 results
210 .into_iter()
211 .map(|result| Value::Int(result as i64))
212 .collect(),
213 ))
214}
215pub(crate) fn combine_map_results(values: Vec<Value>) -> RedisResult<Value> {
217 let mut map: HashMap<Vec<u8>, i64> = HashMap::new();
218
219 for value in values {
220 match value {
221 Value::Array(elements) => {
222 let mut iter = elements.into_iter();
223
224 while let Some(key) = iter.next() {
225 if let Value::BulkString(key_bytes) = key {
226 if let Some(Value::Int(value)) = iter.next() {
227 *map.entry(key_bytes).or_insert(0) += value;
228 } else {
229 return Err((ErrorKind::TypeError, "expected integer value").into());
230 }
231 } else {
232 return Err((ErrorKind::TypeError, "expected string key").into());
233 }
234 }
235 }
236 _ => {
237 return Err((ErrorKind::TypeError, "expected array of values as response").into());
238 }
239 }
240 }
241
242 let result_vec: Vec<(Value, Value)> = map
243 .into_iter()
244 .map(|(k, v)| (Value::BulkString(k), Value::Int(v)))
245 .collect();
246
247 Ok(Value::Map(result_vec))
248}
249
250pub(crate) fn combine_array_results(values: Vec<Value>) -> RedisResult<Value> {
252 let mut results = Vec::new();
253
254 for value in values {
255 match value {
256 Value::Array(values) => results.extend(values),
257 _ => {
258 return Err((ErrorKind::TypeError, "expected array of values as response").into());
259 }
260 }
261 }
262
263 Ok(Value::Array(results))
264}
265
266type MultiSlotResIdxIter<'a> = std::iter::Map<
269 std::slice::Iter<'a, (Route, Vec<usize>)>,
270 fn(&'a (Route, Vec<usize>)) -> Cow<'a, [usize]>,
271>;
272
273fn calculate_multi_slot_result_indices<'a>(
294 route_arg_indices: &'a [(Route, Vec<usize>)],
295 args_pattern: &MultiSlotArgPattern,
296) -> RedisResult<MultiSlotResIdxIter<'a>> {
297 let check_indices_input = |step_count: usize| {
298 for (_, indices) in route_arg_indices {
299 if indices.len() % step_count != 0 {
300 return Err(RedisError::from((
301 ErrorKind::ClientError,
302 "Invalid indices input detected",
303 format!(
304 "Expected argument pattern with tuples of size {step_count}, but found indices: {indices:?}"
305 ),
306 )));
307 }
308 }
309 Ok(())
310 };
311
312 match args_pattern {
313 MultiSlotArgPattern::KeysOnly => Ok(route_arg_indices
314 .iter()
315 .map(|(_, indices)| Cow::Borrowed(indices))),
316 MultiSlotArgPattern::KeysAndLastArg => {
317 Ok(route_arg_indices
319 .iter()
320 .map(|(_, indices)| Cow::Borrowed(&indices[..indices.len() - 1])))
321 }
322 MultiSlotArgPattern::KeyWithTwoArgTriples => {
323 check_indices_input(3)?;
327 Ok(route_arg_indices.iter().map(|(_, indices)| {
328 Cow::Owned(
329 indices
330 .iter()
331 .step_by(3)
332 .map(|idx| idx / 3)
333 .collect::<Vec<usize>>(),
334 )
335 }))
336 }
337 MultiSlotArgPattern::KeyValuePairs =>
338 {
342 check_indices_input(2)?;
343 Ok(route_arg_indices.iter().map(|(_, indices)| {
344 Cow::Owned(
345 indices
346 .iter()
347 .step_by(2)
348 .map(|idx| idx / 2)
349 .collect::<Vec<usize>>(),
350 )
351 }))
352 }
353 }
354}
355
356pub(crate) fn combine_and_sort_array_results(
378 values: Vec<Value>,
379 route_arg_indices: &[(Route, Vec<usize>)],
380 args_pattern: &MultiSlotArgPattern,
381) -> RedisResult<Value> {
382 let result_indices = calculate_multi_slot_result_indices(route_arg_indices, args_pattern)?;
383 let mut results = Vec::new();
384 results.resize(
385 values.iter().fold(0, |acc, value| match value {
386 Value::Array(values) => values.len() + acc,
387 _ => 0,
388 }),
389 Value::Nil,
390 );
391 if values.len() != result_indices.len() {
392 return Err(RedisError::from((
393 ErrorKind::ClientError,
394 "Mismatch in the number of multi-slot results compared to the expected result count.",
395 format!(
396 "Expected: {:?}, Found: {:?}",
397 values.len(),
398 result_indices.len()
399 ),
400 )));
401 }
402
403 for (key_indices, value) in result_indices.into_iter().zip(values) {
404 match value {
405 Value::Array(values) => {
406 assert_eq!(values.len(), key_indices.len());
407 for (index, value) in key_indices.iter().zip(values) {
408 results[*index] = value;
409 }
410 }
411 _ => {
412 return Err((ErrorKind::TypeError, "expected array of values as response").into());
413 }
414 }
415 }
416
417 Ok(Value::Array(results))
418}
419
420fn get_route(is_readonly: bool, key: &[u8]) -> Route {
421 let slot = get_slot(key);
422 if is_readonly {
423 Route::new(slot, SlotAddr::ReplicaOptional)
424 } else {
425 Route::new(slot, SlotAddr::Master)
426 }
427}
428
429#[derive(Debug, Clone, PartialEq)]
432pub enum MultiSlotArgPattern {
433 KeysOnly,
436
437 KeyValuePairs,
440
441 KeysAndLastArg,
444
445 KeyWithTwoArgTriples,
448}
449
450fn multi_shard<R>(
471 routable: &R,
472 cmd: &[u8],
473 first_key_index: usize,
474 args_pattern: MultiSlotArgPattern,
475) -> Option<RoutingInfo>
476where
477 R: Routable + ?Sized,
478{
479 let is_readonly = is_readonly_cmd(cmd);
480 let mut routes = HashMap::new();
481 let mut curr_arg_idx = 0;
482 let incr_add_next_arg = |arg_indices: &mut Vec<usize>, mut curr_arg_idx: usize| {
483 curr_arg_idx += 1;
484 routable.arg_idx(curr_arg_idx)?;
486 arg_indices.push(curr_arg_idx);
487 Some(curr_arg_idx)
488 };
489 while let Some(arg) = routable.arg_idx(first_key_index + curr_arg_idx) {
490 let route = get_route(is_readonly, arg);
491 let arg_indices = routes.entry(route).or_insert(Vec::new());
492
493 arg_indices.push(curr_arg_idx);
494
495 match args_pattern {
496 MultiSlotArgPattern::KeysOnly => {} MultiSlotArgPattern::KeyValuePairs => {
498 curr_arg_idx = incr_add_next_arg(arg_indices, curr_arg_idx)?;
500 }
501 MultiSlotArgPattern::KeysAndLastArg => {
502 if routable
504 .arg_idx(first_key_index + curr_arg_idx + 2)
505 .is_none()
506 {
507 let path_idx = curr_arg_idx + 1;
509 for (_, arg_indices) in routes.iter_mut() {
510 arg_indices.push(path_idx);
511 }
512 break;
513 }
514 }
515 MultiSlotArgPattern::KeyWithTwoArgTriples => {
516 curr_arg_idx = incr_add_next_arg(arg_indices, curr_arg_idx)?;
518 curr_arg_idx = incr_add_next_arg(arg_indices, curr_arg_idx)?;
520 }
521 }
522 curr_arg_idx += 1;
523 }
524
525 let mut routes: Vec<(Route, Vec<usize>)> = routes.into_iter().collect();
526 if routes.is_empty() {
527 return None;
528 }
529
530 Some(if routes.len() == 1 {
531 RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(routes.pop().unwrap().0))
532 } else {
533 RoutingInfo::MultiNode((
534 MultipleNodeRoutingInfo::MultiSlot((routes, args_pattern)),
535 ResponsePolicy::for_command(cmd),
536 ))
537 })
538}
539
540impl ResponsePolicy {
541 pub(crate) fn for_command(cmd: &[u8]) -> Option<ResponsePolicy> {
543 match cmd {
544 b"SCRIPT EXISTS" => Some(ResponsePolicy::AggregateLogical(LogicalAggregateOp::And)),
545
546 b"DBSIZE" | b"DEL" | b"EXISTS" | b"SLOWLOG LEN" | b"TOUCH" | b"UNLINK"
547 | b"LATENCY RESET" | b"PUBSUB NUMPAT" => {
548 Some(ResponsePolicy::Aggregate(AggregateOp::Sum))
549 }
550
551 b"WAIT" => Some(ResponsePolicy::Aggregate(AggregateOp::Min)),
552
553 b"ACL SETUSER" | b"ACL DELUSER" | b"ACL SAVE" | b"CLIENT SETNAME"
554 | b"CLIENT SETINFO" | b"CONFIG SET" | b"CONFIG RESETSTAT" | b"CONFIG REWRITE"
555 | b"FLUSHALL" | b"FLUSHDB" | b"FUNCTION DELETE" | b"FUNCTION FLUSH"
556 | b"FUNCTION LOAD" | b"FUNCTION RESTORE" | b"MEMORY PURGE" | b"MSET" | b"JSON.MSET"
557 | b"PING" | b"SCRIPT FLUSH" | b"SCRIPT LOAD" | b"SLOWLOG RESET" | b"UNWATCH"
558 | b"WATCH" => Some(ResponsePolicy::AllSucceeded),
559
560 b"KEYS"
561 | b"FT._ALIASLIST"
562 | b"FT._LIST"
563 | b"MGET"
564 | b"JSON.MGET"
565 | b"SLOWLOG GET"
566 | b"PUBSUB CHANNELS"
567 | b"PUBSUB SHARDCHANNELS" => Some(ResponsePolicy::CombineArrays),
568
569 b"PUBSUB NUMSUB" | b"PUBSUB SHARDNUMSUB" => Some(ResponsePolicy::CombineMaps),
570
571 b"FUNCTION KILL" | b"SCRIPT KILL" => Some(ResponsePolicy::OneSucceeded),
572
573 b"RANDOMKEY" => Some(ResponsePolicy::FirstSucceededNonEmptyOrAllEmpty),
575
576 b"LATENCY GRAPH" | b"LATENCY HISTOGRAM" | b"LATENCY HISTORY" | b"LATENCY DOCTOR"
577 | b"LATENCY LATEST" => Some(ResponsePolicy::Special),
578
579 b"FUNCTION STATS" => Some(ResponsePolicy::Special),
580
581 b"MEMORY MALLOC-STATS" | b"MEMORY DOCTOR" | b"MEMORY STATS" => {
582 Some(ResponsePolicy::Special)
583 }
584
585 b"INFO" => Some(ResponsePolicy::Special),
586
587 _ => None,
588 }
589 }
590}
591
592enum RouteBy {
593 AllNodes,
594 AllPrimaries,
595 FirstKey,
596 MultiShard(MultiSlotArgPattern),
597 Random,
598 SecondArg,
599 SecondArgAfterKeyCount,
600 SecondArgSlot,
601 StreamsIndex,
602 ThirdArgAfterKeyCount,
603 Undefined,
604}
605
606fn base_routing(cmd: &[u8]) -> RouteBy {
607 match cmd {
608 b"ACL SETUSER"
609 | b"ACL DELUSER"
610 | b"ACL SAVE"
611 | b"CLIENT SETNAME"
612 | b"CLIENT SETINFO"
613 | b"SLOWLOG GET"
614 | b"SLOWLOG LEN"
615 | b"SLOWLOG RESET"
616 | b"CONFIG SET"
617 | b"CONFIG RESETSTAT"
618 | b"CONFIG REWRITE"
619 | b"SCRIPT FLUSH"
620 | b"SCRIPT LOAD"
621 | b"LATENCY RESET"
622 | b"LATENCY GRAPH"
623 | b"LATENCY HISTOGRAM"
624 | b"LATENCY HISTORY"
625 | b"LATENCY DOCTOR"
626 | b"LATENCY LATEST"
627 | b"PUBSUB NUMPAT"
628 | b"PUBSUB CHANNELS"
629 | b"PUBSUB NUMSUB"
630 | b"PUBSUB SHARDCHANNELS"
631 | b"PUBSUB SHARDNUMSUB"
632 | b"SCRIPT KILL"
633 | b"FUNCTION KILL"
634 | b"FUNCTION STATS" => RouteBy::AllNodes,
635
636 b"DBSIZE"
637 | b"DEBUG"
638 | b"FLUSHALL"
639 | b"FLUSHDB"
640 | b"FT._ALIASLIST"
641 | b"FT._LIST"
642 | b"FUNCTION DELETE"
643 | b"FUNCTION FLUSH"
644 | b"FUNCTION LOAD"
645 | b"FUNCTION RESTORE"
646 | b"INFO"
647 | b"KEYS"
648 | b"MEMORY DOCTOR"
649 | b"MEMORY MALLOC-STATS"
650 | b"MEMORY PURGE"
651 | b"MEMORY STATS"
652 | b"PING"
653 | b"SCRIPT EXISTS"
654 | b"UNWATCH"
655 | b"WAIT"
656 | b"RANDOMKEY"
657 | b"WAITAOF" => RouteBy::AllPrimaries,
658
659 b"MGET" | b"DEL" | b"EXISTS" | b"UNLINK" | b"TOUCH" | b"WATCH" => {
660 RouteBy::MultiShard(MultiSlotArgPattern::KeysOnly)
661 }
662
663 b"MSET" => RouteBy::MultiShard(MultiSlotArgPattern::KeyValuePairs),
664 b"JSON.MGET" => RouteBy::MultiShard(MultiSlotArgPattern::KeysAndLastArg),
665 b"JSON.MSET" => RouteBy::MultiShard(MultiSlotArgPattern::KeyWithTwoArgTriples),
666 b"SCAN" | b"SHUTDOWN" | b"SLAVEOF" | b"REPLICAOF" => RouteBy::Undefined,
668
669 b"BLMPOP" | b"BZMPOP" | b"EVAL" | b"EVALSHA" | b"EVALSHA_RO" | b"EVAL_RO" | b"FCALL"
670 | b"FCALL_RO" => RouteBy::ThirdArgAfterKeyCount,
671
672 b"BITOP"
673 | b"MEMORY USAGE"
674 | b"PFDEBUG"
675 | b"XGROUP CREATE"
676 | b"XGROUP CREATECONSUMER"
677 | b"XGROUP DELCONSUMER"
678 | b"XGROUP DESTROY"
679 | b"XGROUP SETID"
680 | b"XINFO CONSUMERS"
681 | b"XINFO GROUPS"
682 | b"XINFO STREAM"
683 | b"OBJECT ENCODING"
684 | b"OBJECT FREQ"
685 | b"OBJECT IDLETIME"
686 | b"OBJECT REFCOUNT"
687 | b"JSON.DEBUG" => RouteBy::SecondArg,
688
689 b"LMPOP" | b"SINTERCARD" | b"ZDIFF" | b"ZINTER" | b"ZINTERCARD" | b"ZMPOP" | b"ZUNION" => {
690 RouteBy::SecondArgAfterKeyCount
691 }
692
693 b"XREAD" | b"XREADGROUP" => RouteBy::StreamsIndex,
694
695 b"ACL DRYRUN"
698 | b"ACL GENPASS"
699 | b"ACL GETUSER"
700 | b"ACL HELP"
701 | b"ACL LIST"
702 | b"ACL LOG"
703 | b"ACL USERS"
704 | b"ACL WHOAMI"
705 | b"AUTH"
706 | b"BGSAVE"
707 | b"CLIENT GETNAME"
708 | b"CLIENT GETREDIR"
709 | b"CLIENT ID"
710 | b"CLIENT INFO"
711 | b"CLIENT KILL"
712 | b"CLIENT PAUSE"
713 | b"CLIENT REPLY"
714 | b"CLIENT TRACKINGINFO"
715 | b"CLIENT UNBLOCK"
716 | b"CLIENT UNPAUSE"
717 | b"CLUSTER COUNT-FAILURE-REPORTS"
718 | b"CLUSTER INFO"
719 | b"CLUSTER KEYSLOT"
720 | b"CLUSTER MEET"
721 | b"CLUSTER MYSHARDID"
722 | b"CLUSTER NODES"
723 | b"CLUSTER REPLICAS"
724 | b"CLUSTER RESET"
725 | b"CLUSTER SET-CONFIG-EPOCH"
726 | b"CLUSTER SHARDS"
727 | b"CLUSTER SLOTS"
728 | b"COMMAND COUNT"
729 | b"COMMAND GETKEYS"
730 | b"COMMAND LIST"
731 | b"COMMAND"
732 | b"CONFIG GET"
733 | b"ECHO"
734 | b"FUNCTION LIST"
735 | b"LASTSAVE"
736 | b"LOLWUT"
737 | b"MODULE LIST"
738 | b"MODULE LOAD"
739 | b"MODULE LOADEX"
740 | b"MODULE UNLOAD"
741 | b"READONLY"
742 | b"READWRITE"
743 | b"SAVE"
744 | b"SCRIPT SHOW"
745 | b"TFCALL"
746 | b"TFCALLASYNC"
747 | b"TFUNCTION DELETE"
748 | b"TFUNCTION LIST"
749 | b"TFUNCTION LOAD"
750 | b"TIME" => RouteBy::Random,
751
752 b"CLUSTER ADDSLOTS"
753 | b"CLUSTER COUNTKEYSINSLOT"
754 | b"CLUSTER DELSLOTS"
755 | b"CLUSTER DELSLOTSRANGE"
756 | b"CLUSTER GETKEYSINSLOT"
757 | b"CLUSTER SETSLOT" => RouteBy::SecondArgSlot,
758
759 _ => RouteBy::FirstKey,
760 }
761}
762
763impl RoutingInfo {
764 pub(crate) fn for_routable<R>(r: &R) -> Option<RoutingInfo>
766 where
767 R: Routable + ?Sized,
768 {
769 let cmd = &r.command()?[..];
770 match base_routing(cmd) {
771 RouteBy::AllNodes => Some(RoutingInfo::MultiNode((
772 MultipleNodeRoutingInfo::AllNodes,
773 ResponsePolicy::for_command(cmd),
774 ))),
775
776 RouteBy::AllPrimaries => Some(RoutingInfo::MultiNode((
777 MultipleNodeRoutingInfo::AllMasters,
778 ResponsePolicy::for_command(cmd),
779 ))),
780
781 RouteBy::MultiShard(arg_pattern) => multi_shard(r, cmd, 1, arg_pattern),
782
783 RouteBy::Random => Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)),
784
785 RouteBy::ThirdArgAfterKeyCount => {
786 let key_count = r
787 .arg_idx(2)
788 .and_then(|x| std::str::from_utf8(x).ok())
789 .and_then(|x| x.parse::<u64>().ok())?;
790 if key_count == 0 {
791 Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
792 } else {
793 r.arg_idx(3).map(|key| RoutingInfo::for_key(cmd, key))
794 }
795 }
796
797 RouteBy::SecondArg => r.arg_idx(2).map(|key| RoutingInfo::for_key(cmd, key)),
798
799 RouteBy::SecondArgAfterKeyCount => {
800 let key_count = r
801 .arg_idx(1)
802 .and_then(|x| std::str::from_utf8(x).ok())
803 .and_then(|x| x.parse::<u64>().ok())?;
804 if key_count == 0 {
805 Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
806 } else {
807 r.arg_idx(2).map(|key| RoutingInfo::for_key(cmd, key))
808 }
809 }
810
811 RouteBy::StreamsIndex => {
812 let streams_position = r.position(b"STREAMS")?;
813 r.arg_idx(streams_position + 1)
814 .map(|key| RoutingInfo::for_key(cmd, key))
815 }
816
817 RouteBy::SecondArgSlot => r
818 .arg_idx(2)
819 .and_then(|arg| std::str::from_utf8(arg).ok())
820 .and_then(|slot| slot.parse::<u16>().ok())
821 .map(|slot| {
822 RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route::new(
823 slot,
824 SlotAddr::Master,
825 )))
826 }),
827
828 RouteBy::FirstKey => match r.arg_idx(1) {
829 Some(key) => Some(RoutingInfo::for_key(cmd, key)),
830 None => Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)),
831 },
832
833 RouteBy::Undefined => None,
834 }
835 }
836
837 fn for_key(cmd: &[u8], key: &[u8]) -> RoutingInfo {
838 RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(get_route(
839 is_readonly_cmd(cmd),
840 key,
841 )))
842 }
843}
844
845pub(crate) trait Routable {
847 fn command(&self) -> Option<Vec<u8>> {
850 let primary_command = self.arg_idx(0).map(|x| x.to_ascii_uppercase())?;
851 let mut primary_command = match primary_command.as_slice() {
852 b"XGROUP" | b"OBJECT" | b"SLOWLOG" | b"FUNCTION" | b"MODULE" | b"COMMAND"
853 | b"PUBSUB" | b"CONFIG" | b"MEMORY" | b"XINFO" | b"CLIENT" | b"ACL" | b"SCRIPT"
854 | b"CLUSTER" | b"LATENCY" => primary_command,
855 _ => {
856 return Some(primary_command);
857 }
858 };
859
860 Some(match self.arg_idx(1) {
861 Some(secondary_command) => {
862 let previous_len = primary_command.len();
863 primary_command.reserve(secondary_command.len() + 1);
864 primary_command.extend(b" ");
865 primary_command.extend(secondary_command);
866 let current_len = primary_command.len();
867 primary_command[previous_len + 1..current_len].make_ascii_uppercase();
868 primary_command
869 }
870 None => primary_command,
871 })
872 }
873
874 fn arg_idx(&self, idx: usize) -> Option<&[u8]>;
876
877 fn position(&self, candidate: &[u8]) -> Option<usize>;
879}
880
881impl Routable for Cmd {
882 fn arg_idx(&self, idx: usize) -> Option<&[u8]> {
883 self.arg_idx(idx)
884 }
885
886 fn position(&self, candidate: &[u8]) -> Option<usize> {
887 self.args_iter().position(|a| match a {
888 Arg::Simple(d) => d.eq_ignore_ascii_case(candidate),
889 _ => false,
890 })
891 }
892}
893
894impl Routable for Value {
895 fn arg_idx(&self, idx: usize) -> Option<&[u8]> {
896 match self {
897 Value::Array(args) => match args.get(idx) {
898 Some(Value::BulkString(ref data)) => Some(&data[..]),
899 _ => None,
900 },
901 _ => None,
902 }
903 }
904
905 fn position(&self, candidate: &[u8]) -> Option<usize> {
906 match self {
907 Value::Array(args) => args.iter().position(|a| match a {
908 Value::BulkString(d) => d.eq_ignore_ascii_case(candidate),
909 _ => false,
910 }),
911 _ => None,
912 }
913 }
914}
915
916#[derive(Debug, Hash, Clone)]
917pub(crate) struct Slot {
918 pub(crate) start: u16,
919 pub(crate) end: u16,
920 pub(crate) master: String,
921 pub(crate) replicas: Vec<String>,
922}
923
924impl Slot {
925 #[allow(dead_code)] pub(crate) fn master(&self) -> &str {
927 self.master.as_str()
928 }
929
930 #[allow(dead_code)] pub(crate) fn replicas(&self) -> Vec<String> {
932 self.replicas.clone()
933 }
934}
935
936#[derive(Eq, PartialEq, Clone, Copy, Debug, Hash)]
938pub enum SlotAddr {
939 Master,
941 ReplicaOptional,
944 ReplicaRequired,
947}
948
949#[derive(Eq, PartialEq, Clone, Copy, Debug, Hash)]
952pub struct Route(u16, SlotAddr);
953
954impl Route {
955 pub fn new(slot: u16, slot_addr: SlotAddr) -> Self {
957 Self(slot, slot_addr)
958 }
959
960 pub(crate) fn slot(&self) -> u16 {
962 self.0
963 }
964
965 pub(crate) fn slot_addr(&self) -> SlotAddr {
967 self.1
968 }
969
970 pub(crate) fn new_random_primary() -> Self {
972 Self::new(random_slot(), SlotAddr::Master)
973 }
974}
975
976fn random_slot() -> u16 {
978 let mut rng = rand::rng();
979 rng.random_range(0..SLOT_SIZE)
980}
981
982fn get_hashtag(key: &[u8]) -> Option<&[u8]> {
983 let open = key.iter().position(|v| *v == b'{')?;
984
985 let close = key[open..].iter().position(|v| *v == b'}')?;
986
987 let rv = &key[open + 1..open + close];
988 (!rv.is_empty()).then_some(rv)
989}
990
991#[cfg(test)]
992mod tests_routing {
993 use super::{
994 command_for_multi_slot_indices, AggregateOp, MultiSlotArgPattern, MultipleNodeRoutingInfo,
995 ResponsePolicy, Route, RoutingInfo, SingleNodeRoutingInfo, SlotAddr,
996 };
997 use crate::cluster_routing::slot;
998 use crate::{cmd, parser::parse_redis_value, Value};
999 use core::panic;
1000
1001 #[test]
1002 fn test_routing_info_mixed_capatalization() {
1003 let mut upper = cmd("XREAD");
1004 upper.arg("STREAMS").arg("foo").arg(0);
1005
1006 let mut lower = cmd("xread");
1007 lower.arg("streams").arg("foo").arg(0);
1008
1009 assert_eq!(
1010 RoutingInfo::for_routable(&upper).unwrap(),
1011 RoutingInfo::for_routable(&lower).unwrap()
1012 );
1013
1014 let mut mixed = cmd("xReAd");
1015 mixed.arg("StReAmS").arg("foo").arg(0);
1016
1017 assert_eq!(
1018 RoutingInfo::for_routable(&lower).unwrap(),
1019 RoutingInfo::for_routable(&mixed).unwrap()
1020 );
1021 }
1022
1023 #[test]
1024 fn test_routing_info() {
1025 let mut test_cmds = vec![];
1026
1027 let mut test_cmd = cmd("FLUSHALL");
1029 test_cmd.arg("");
1030 test_cmds.push(test_cmd);
1031
1032 test_cmd = cmd("ECHO");
1034 test_cmd.arg("");
1035 test_cmds.push(test_cmd);
1036
1037 test_cmd = cmd("SET");
1039 test_cmd.arg("42");
1040 test_cmds.push(test_cmd);
1041
1042 test_cmd = cmd("XINFO");
1044 test_cmd.arg("GROUPS").arg("FOOBAR");
1045 test_cmds.push(test_cmd);
1046
1047 test_cmd = cmd("EVAL");
1049 test_cmd.arg("FOO").arg("0").arg("BAR");
1050 test_cmds.push(test_cmd);
1051
1052 test_cmd = cmd("EVAL");
1054 test_cmd.arg("FOO").arg("4").arg("BAR");
1055 test_cmds.push(test_cmd);
1056
1057 test_cmd = cmd("XREAD");
1059 test_cmd.arg("STREAMS").arg("4");
1060 test_cmds.push(test_cmd);
1061
1062 test_cmd = cmd("XREAD");
1064 test_cmd.arg("FOO").arg("STREAMS").arg("4");
1065 test_cmds.push(test_cmd);
1066
1067 for cmd in test_cmds {
1068 let value = parse_redis_value(&cmd.get_packed_command()).unwrap();
1069 assert_eq!(
1070 RoutingInfo::for_routable(&value).unwrap(),
1071 RoutingInfo::for_routable(&cmd).unwrap(),
1072 );
1073 }
1074
1075 for cmd in [cmd("FLUSHALL"), cmd("FLUSHDB"), cmd("PING")] {
1078 assert_eq!(
1079 RoutingInfo::for_routable(&cmd),
1080 Some(RoutingInfo::MultiNode((
1081 MultipleNodeRoutingInfo::AllMasters,
1082 Some(ResponsePolicy::AllSucceeded)
1083 )))
1084 );
1085 }
1086
1087 assert_eq!(
1088 RoutingInfo::for_routable(&cmd("DBSIZE")),
1089 Some(RoutingInfo::MultiNode((
1090 MultipleNodeRoutingInfo::AllMasters,
1091 Some(ResponsePolicy::Aggregate(AggregateOp::Sum))
1092 )))
1093 );
1094
1095 assert_eq!(
1096 RoutingInfo::for_routable(&cmd("SCRIPT KILL")),
1097 Some(RoutingInfo::MultiNode((
1098 MultipleNodeRoutingInfo::AllNodes,
1099 Some(ResponsePolicy::OneSucceeded)
1100 )))
1101 );
1102
1103 assert_eq!(
1104 RoutingInfo::for_routable(&cmd("INFO")),
1105 Some(RoutingInfo::MultiNode((
1106 MultipleNodeRoutingInfo::AllMasters,
1107 Some(ResponsePolicy::Special)
1108 )))
1109 );
1110
1111 assert_eq!(
1112 RoutingInfo::for_routable(&cmd("KEYS")),
1113 Some(RoutingInfo::MultiNode((
1114 MultipleNodeRoutingInfo::AllMasters,
1115 Some(ResponsePolicy::CombineArrays)
1116 )))
1117 );
1118
1119 for cmd in vec![
1120 cmd("SCAN"),
1121 cmd("SHUTDOWN"),
1122 cmd("SLAVEOF"),
1123 cmd("REPLICAOF"),
1124 ] {
1125 assert_eq!(
1126 RoutingInfo::for_routable(&cmd),
1127 None,
1128 "{}",
1129 std::str::from_utf8(cmd.arg_idx(0).unwrap()).unwrap()
1130 );
1131 }
1132
1133 for cmd in [
1134 cmd("EVAL").arg(r#"redis.call("PING");"#).arg(0),
1135 cmd("EVALSHA").arg(r#"redis.call("PING");"#).arg(0),
1136 ] {
1137 assert_eq!(
1138 RoutingInfo::for_routable(cmd),
1139 Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
1140 );
1141 }
1142
1143 assert_eq!(
1145 RoutingInfo::for_routable(cmd("FCALL").arg("foo").arg(1).arg("mykey")),
1146 Some(RoutingInfo::SingleNode(
1147 SingleNodeRoutingInfo::SpecificNode(Route::new(slot(b"mykey"), SlotAddr::Master))
1148 ))
1149 );
1150
1151 for (cmd, expected) in [
1152 (
1153 cmd("EVAL")
1154 .arg(r#"redis.call("GET, KEYS[1]");"#)
1155 .arg(1)
1156 .arg("foo"),
1157 Some(RoutingInfo::SingleNode(
1158 SingleNodeRoutingInfo::SpecificNode(Route::new(slot(b"foo"), SlotAddr::Master)),
1159 )),
1160 ),
1161 (
1162 cmd("XGROUP")
1163 .arg("CREATE")
1164 .arg("mystream")
1165 .arg("workers")
1166 .arg("$")
1167 .arg("MKSTREAM"),
1168 Some(RoutingInfo::SingleNode(
1169 SingleNodeRoutingInfo::SpecificNode(Route::new(
1170 slot(b"mystream"),
1171 SlotAddr::Master,
1172 )),
1173 )),
1174 ),
1175 (
1176 cmd("XINFO").arg("GROUPS").arg("foo"),
1177 Some(RoutingInfo::SingleNode(
1178 SingleNodeRoutingInfo::SpecificNode(Route::new(
1179 slot(b"foo"),
1180 SlotAddr::ReplicaOptional,
1181 )),
1182 )),
1183 ),
1184 (
1185 cmd("XREADGROUP")
1186 .arg("GROUP")
1187 .arg("wkrs")
1188 .arg("consmrs")
1189 .arg("STREAMS")
1190 .arg("mystream"),
1191 Some(RoutingInfo::SingleNode(
1192 SingleNodeRoutingInfo::SpecificNode(Route::new(
1193 slot(b"mystream"),
1194 SlotAddr::Master,
1195 )),
1196 )),
1197 ),
1198 (
1199 cmd("XREAD")
1200 .arg("COUNT")
1201 .arg("2")
1202 .arg("STREAMS")
1203 .arg("mystream")
1204 .arg("writers")
1205 .arg("0-0")
1206 .arg("0-0"),
1207 Some(RoutingInfo::SingleNode(
1208 SingleNodeRoutingInfo::SpecificNode(Route::new(
1209 slot(b"mystream"),
1210 SlotAddr::ReplicaOptional,
1211 )),
1212 )),
1213 ),
1214 ] {
1215 assert_eq!(
1216 RoutingInfo::for_routable(cmd),
1217 expected,
1218 "{}",
1219 std::str::from_utf8(cmd.arg_idx(0).unwrap()).unwrap()
1220 );
1221 }
1222 }
1223
1224 #[test]
1225 fn test_slot_for_packed_cmd() {
1226 assert!(matches!(RoutingInfo::for_routable(&parse_redis_value(&[
1227 42, 50, 13, 10, 36, 54, 13, 10, 69, 88, 73, 83, 84, 83, 13, 10, 36, 49, 54, 13, 10,
1228 244, 93, 23, 40, 126, 127, 253, 33, 89, 47, 185, 204, 171, 249, 96, 139, 13, 10
1229 ]).unwrap()), Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route(slot, SlotAddr::ReplicaOptional)))) if slot == 964));
1230
1231 assert!(matches!(RoutingInfo::for_routable(&parse_redis_value(&[
1232 42, 54, 13, 10, 36, 51, 13, 10, 83, 69, 84, 13, 10, 36, 49, 54, 13, 10, 36, 241,
1233 197, 111, 180, 254, 5, 175, 143, 146, 171, 39, 172, 23, 164, 145, 13, 10, 36, 52,
1234 13, 10, 116, 114, 117, 101, 13, 10, 36, 50, 13, 10, 78, 88, 13, 10, 36, 50, 13, 10,
1235 80, 88, 13, 10, 36, 55, 13, 10, 49, 56, 48, 48, 48, 48, 48, 13, 10
1236 ]).unwrap()), Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route(slot, SlotAddr::Master)))) if slot == 8352));
1237
1238 assert!(matches!(RoutingInfo::for_routable(&parse_redis_value(&[
1239 42, 54, 13, 10, 36, 51, 13, 10, 83, 69, 84, 13, 10, 36, 49, 54, 13, 10, 169, 233,
1240 247, 59, 50, 247, 100, 232, 123, 140, 2, 101, 125, 221, 66, 170, 13, 10, 36, 52,
1241 13, 10, 116, 114, 117, 101, 13, 10, 36, 50, 13, 10, 78, 88, 13, 10, 36, 50, 13, 10,
1242 80, 88, 13, 10, 36, 55, 13, 10, 49, 56, 48, 48, 48, 48, 48, 13, 10
1243 ]).unwrap()), Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route(slot, SlotAddr::Master)))) if slot == 5210));
1244 }
1245
1246 #[test]
1247 fn test_multi_shard_keys_only() {
1248 let mut cmd = cmd("DEL");
1249 cmd.arg("foo").arg("bar").arg("baz").arg("{bar}vaz");
1250 let routing = RoutingInfo::for_routable(&cmd);
1251 let mut expected = std::collections::HashMap::new();
1252 expected.insert(Route(4813, SlotAddr::Master), vec![2]);
1253 expected.insert(Route(5061, SlotAddr::Master), vec![1, 3]);
1254 expected.insert(Route(12182, SlotAddr::Master), vec![0]);
1255
1256 assert!(
1257 matches!(routing.clone(), Some(RoutingInfo::MultiNode((MultipleNodeRoutingInfo::MultiSlot((vec, args_pattern)), Some(ResponsePolicy::Aggregate(AggregateOp::Sum))))) if {
1258 let routes = vec.clone().into_iter().collect();
1259 expected == routes && args_pattern == MultiSlotArgPattern::KeysOnly
1260 }),
1261 "expected={expected:?}\nrouting={routing:?}"
1262 );
1263
1264 let mut cmd = crate::cmd("MGET");
1265 cmd.arg("foo").arg("bar").arg("baz").arg("{bar}vaz");
1266 let routing = RoutingInfo::for_routable(&cmd);
1267 let mut expected = std::collections::HashMap::new();
1268 expected.insert(Route(4813, SlotAddr::ReplicaOptional), vec![2]);
1269 expected.insert(Route(5061, SlotAddr::ReplicaOptional), vec![1, 3]);
1270 expected.insert(Route(12182, SlotAddr::ReplicaOptional), vec![0]);
1271
1272 assert!(
1273 matches!(routing.clone(), Some(RoutingInfo::MultiNode((MultipleNodeRoutingInfo::MultiSlot((vec, args_pattern)), Some(ResponsePolicy::CombineArrays)))) if {
1274 let routes = vec.clone().into_iter().collect();
1275 expected == routes && args_pattern == MultiSlotArgPattern::KeysOnly
1276 }),
1277 "expected={expected:?}\nrouting={routing:?}"
1278 );
1279 }
1280
1281 #[test]
1282 fn test_multi_shard_key_value_pairs() {
1283 let mut cmd = cmd("MSET");
1284 cmd.arg("foo") .arg("bar") .arg("foo2") .arg("bar2") .arg("{foo}foo3") .arg("bar3"); let routing = RoutingInfo::for_routable(&cmd);
1291 let mut expected = std::collections::HashMap::new();
1292 expected.insert(Route(1044, SlotAddr::Master), vec![2, 3]);
1293 expected.insert(Route(12182, SlotAddr::Master), vec![0, 1, 4, 5]);
1294
1295 assert!(
1296 matches!(routing.clone(), Some(RoutingInfo::MultiNode((MultipleNodeRoutingInfo::MultiSlot((vec, args_pattern)), Some(ResponsePolicy::AllSucceeded)))) if {
1297 let routes = vec.clone().into_iter().collect();
1298 expected == routes && args_pattern == MultiSlotArgPattern::KeyValuePairs
1299 }),
1300 "expected={expected:?}\nrouting={routing:?}"
1301 );
1302 }
1303
1304 #[test]
1305 fn test_multi_shard_keys_and_path() {
1306 let mut cmd = cmd("JSON.MGET");
1307 cmd.arg("foo") .arg("bar") .arg("baz") .arg("{bar}vaz") .arg("$.f.a"); let routing = RoutingInfo::for_routable(&cmd);
1313 let mut expected = std::collections::HashMap::new();
1314 expected.insert(Route(4813, SlotAddr::ReplicaOptional), vec![2, 4]);
1315 expected.insert(Route(5061, SlotAddr::ReplicaOptional), vec![1, 3, 4]);
1316 expected.insert(Route(12182, SlotAddr::ReplicaOptional), vec![0, 4]);
1317
1318 assert!(
1319 matches!(routing.clone(), Some(RoutingInfo::MultiNode((MultipleNodeRoutingInfo::MultiSlot((vec, args_pattern)), Some(ResponsePolicy::CombineArrays)))) if {
1320 let routes = vec.clone().into_iter().collect();
1321 expected == routes && args_pattern == MultiSlotArgPattern::KeysAndLastArg
1322 }),
1323 "expected={expected:?}\nrouting={routing:?}"
1324 );
1325 }
1326
1327 #[test]
1328 fn test_multi_shard_key_with_two_arg_triples() {
1329 let mut cmd = cmd("JSON.MSET");
1330 cmd
1331 .arg("foo") .arg("$.a") .arg("bar") .arg("foo2") .arg("$.f.a") .arg("bar2") .arg("{foo}foo3") .arg("$.f.a") .arg("bar3"); let routing = RoutingInfo::for_routable(&cmd);
1341 let mut expected = std::collections::HashMap::new();
1342 expected.insert(Route(1044, SlotAddr::Master), vec![3, 4, 5]);
1343 expected.insert(Route(12182, SlotAddr::Master), vec![0, 1, 2, 6, 7, 8]);
1344
1345 assert!(
1346 matches!(routing.clone(), Some(RoutingInfo::MultiNode((MultipleNodeRoutingInfo::MultiSlot((vec, args_pattern)), Some(ResponsePolicy::AllSucceeded)))) if {
1347 let routes = vec.clone().into_iter().collect();
1348 expected == routes && args_pattern == MultiSlotArgPattern::KeyWithTwoArgTriples
1349 }),
1350 "expected={expected:?}\nrouting={routing:?}"
1351 );
1352 }
1353
1354 #[test]
1355 fn test_command_creation_for_multi_shard() {
1356 let mut original_cmd = cmd("DEL");
1357 original_cmd
1358 .arg("foo")
1359 .arg("bar")
1360 .arg("baz")
1361 .arg("{bar}vaz");
1362 let routing = RoutingInfo::for_routable(&original_cmd);
1363 let expected = [vec![0], vec![1, 3], vec![2]];
1364
1365 let mut indices: Vec<_> = match routing {
1366 Some(RoutingInfo::MultiNode((
1367 MultipleNodeRoutingInfo::MultiSlot((vec, MultiSlotArgPattern::KeysOnly)),
1368 _,
1369 ))) => vec.into_iter().map(|(_, indices)| indices).collect(),
1370 _ => panic!("unexpected routing: {routing:?}"),
1371 };
1372 indices.sort_by(|prev, next| prev.iter().next().unwrap().cmp(next.iter().next().unwrap())); for (index, indices) in indices.into_iter().enumerate() {
1375 let cmd = command_for_multi_slot_indices(&original_cmd, indices.iter());
1376 let expected_indices = &expected[index];
1377 assert_eq!(original_cmd.arg_idx(0), cmd.arg_idx(0));
1378 for (index, target_index) in expected_indices.iter().enumerate() {
1379 let target_index = target_index + 1;
1380 assert_eq!(original_cmd.arg_idx(target_index), cmd.arg_idx(index + 1));
1381 }
1382 }
1383 }
1384
1385 #[test]
1386 fn test_combine_multi_shard_to_single_node_when_all_keys_are_in_same_slot() {
1387 let mut cmd = cmd("DEL");
1388 cmd.arg("foo").arg("{foo}bar").arg("{foo}baz");
1389 let routing = RoutingInfo::for_routable(&cmd);
1390
1391 assert!(
1392 matches!(
1393 routing,
1394 Some(RoutingInfo::SingleNode(
1395 SingleNodeRoutingInfo::SpecificNode(Route(12182, SlotAddr::Master))
1396 ))
1397 ),
1398 "{routing:?}"
1399 );
1400 }
1401
1402 #[test]
1403 fn test_combining_results_into_single_array_only_keys() {
1404 let res1 = Value::Array(vec![Value::Nil, Value::Okay]);
1406 let res2 = Value::Array(vec![
1407 Value::BulkString("1".as_bytes().to_vec()),
1408 Value::BulkString("4".as_bytes().to_vec()),
1409 ]);
1410 let res3 = Value::Array(vec![Value::SimpleString("2".to_string()), Value::Int(3)]);
1411 let results = super::combine_and_sort_array_results(
1412 vec![res1, res2, res3],
1413 &[
1414 (Route(4813, SlotAddr::Master), vec![2, 3]),
1415 (Route(5061, SlotAddr::Master), vec![1, 4]),
1416 (Route(12182, SlotAddr::Master), vec![0, 5]),
1417 ],
1418 &MultiSlotArgPattern::KeysOnly,
1419 );
1420
1421 assert_eq!(
1422 results.unwrap(),
1423 Value::Array(vec![
1424 Value::SimpleString("2".to_string()),
1425 Value::BulkString("1".as_bytes().to_vec()),
1426 Value::Nil,
1427 Value::Okay,
1428 Value::BulkString("4".as_bytes().to_vec()),
1429 Value::Int(3),
1430 ])
1431 );
1432 }
1433
1434 #[test]
1435 fn test_combining_results_into_single_array_key_value_paires() {
1436 let res1 = Value::Array(vec![Value::Okay]);
1438 let res2 = Value::Array(vec![Value::BulkString("1".as_bytes().to_vec()), Value::Nil]);
1439 let results = super::combine_and_sort_array_results(
1440 vec![res1, res2],
1441 &[
1442 (Route(1044, SlotAddr::Master), vec![2, 3]),
1443 (Route(12182, SlotAddr::Master), vec![0, 1, 4, 5]),
1444 ],
1445 &MultiSlotArgPattern::KeyValuePairs,
1446 );
1447
1448 assert_eq!(
1449 results.unwrap(),
1450 Value::Array(vec![
1451 Value::BulkString("1".as_bytes().to_vec()),
1452 Value::Okay,
1453 Value::Nil
1454 ])
1455 );
1456 }
1457
1458 #[test]
1459 fn test_combining_results_into_single_array_keys_and_path() {
1460 let res1 = Value::Array(vec![Value::Okay]);
1462 let res2 = Value::Array(vec![Value::BulkString("1".as_bytes().to_vec()), Value::Nil]);
1463 let results = super::combine_and_sort_array_results(
1464 vec![res1, res2],
1465 &[
1466 (Route(5061, SlotAddr::Master), vec![2, 3]),
1467 (Route(12182, SlotAddr::Master), vec![0, 1, 3]),
1468 ],
1469 &MultiSlotArgPattern::KeysAndLastArg,
1470 );
1471
1472 assert_eq!(
1473 results.unwrap(),
1474 Value::Array(vec![
1475 Value::BulkString("1".as_bytes().to_vec()),
1476 Value::Nil,
1477 Value::Okay,
1478 ])
1479 );
1480 }
1481
1482 #[test]
1483 fn test_combining_results_into_single_array_key_with_two_arg_triples() {
1484 let res1 = Value::Array(vec![Value::Okay]);
1486 let res2 = Value::Array(vec![Value::BulkString("1".as_bytes().to_vec()), Value::Nil]);
1487 let results = super::combine_and_sort_array_results(
1488 vec![res1, res2],
1489 &[
1490 (Route(5061, SlotAddr::Master), vec![3, 4, 5]),
1491 (Route(12182, SlotAddr::Master), vec![0, 1, 2, 6, 7, 8]),
1492 ],
1493 &MultiSlotArgPattern::KeyWithTwoArgTriples,
1494 );
1495
1496 assert_eq!(
1497 results.unwrap(),
1498 Value::Array(vec![
1499 Value::BulkString("1".as_bytes().to_vec()),
1500 Value::Okay,
1501 Value::Nil
1502 ])
1503 );
1504 }
1505
1506 #[test]
1507 fn test_combine_map_results() {
1508 let input = vec![];
1509 let result = super::combine_map_results(input).unwrap();
1510 assert_eq!(result, Value::Map(vec![]));
1511
1512 let input = vec![
1513 Value::Array(vec![
1514 Value::BulkString(b"key1".to_vec()),
1515 Value::Int(5),
1516 Value::BulkString(b"key2".to_vec()),
1517 Value::Int(10),
1518 ]),
1519 Value::Array(vec![
1520 Value::BulkString(b"key1".to_vec()),
1521 Value::Int(3),
1522 Value::BulkString(b"key3".to_vec()),
1523 Value::Int(15),
1524 ]),
1525 ];
1526 let result = super::combine_map_results(input).unwrap();
1527 let mut expected = vec![
1528 (Value::BulkString(b"key1".to_vec()), Value::Int(8)),
1529 (Value::BulkString(b"key2".to_vec()), Value::Int(10)),
1530 (Value::BulkString(b"key3".to_vec()), Value::Int(15)),
1531 ];
1532 expected.sort_unstable_by(|a, b| match (&a.0, &b.0) {
1533 (Value::BulkString(a_bytes), Value::BulkString(b_bytes)) => a_bytes.cmp(b_bytes),
1534 _ => std::cmp::Ordering::Equal,
1535 });
1536 let mut result_vec = match result {
1537 Value::Map(v) => v,
1538 _ => panic!("Expected Map"),
1539 };
1540 result_vec.sort_unstable_by(|a, b| match (&a.0, &b.0) {
1541 (Value::BulkString(a_bytes), Value::BulkString(b_bytes)) => a_bytes.cmp(b_bytes),
1542 _ => std::cmp::Ordering::Equal,
1543 });
1544 assert_eq!(result_vec, expected);
1545
1546 let input = vec![Value::Int(5)];
1547 let result = super::combine_map_results(input);
1548 assert!(result.is_err());
1549 }
1550}