1use std::cmp::min;
2use std::collections::{BTreeMap, HashMap, HashSet};
3
4use rand::prelude::IndexedRandom;
5use rand::rng;
6
7use crate::cmd::{Arg, Cmd};
8use crate::commands::is_readonly_cmd;
9use crate::types::Value;
10use crate::{ErrorKind, RedisResult};
11
12pub(crate) const SLOT_SIZE: u16 = 16384;
13
14fn slot(key: &[u8]) -> u16 {
15 crc16::State::<crc16::XMODEM>::calculate(key) % SLOT_SIZE
16}
17
18#[derive(Clone, PartialEq, Debug)]
19pub(crate) enum Redirect {
20 Moved(String),
21 Ask(String),
22}
23
24#[derive(Debug, Clone, Copy, PartialEq)]
26pub enum LogicalAggregateOp {
27 And,
29 }
31
32#[derive(Debug, Clone, Copy, PartialEq)]
34pub enum AggregateOp {
35 Min,
37 Sum,
39 }
41
42#[derive(Debug, Clone, Copy, PartialEq)]
44pub enum ResponsePolicy {
45 OneSucceeded,
47 OneSucceededNonEmpty,
49 AllSucceeded,
51 AggregateLogical(LogicalAggregateOp),
53 Aggregate(AggregateOp),
55 CombineArrays,
57 Special,
59}
60
61#[derive(Debug, Clone, PartialEq)]
63pub enum RoutingInfo {
64 SingleNode(SingleNodeRoutingInfo),
66 MultiNode((MultipleNodeRoutingInfo, Option<ResponsePolicy>)),
68}
69
70#[derive(Debug, Clone, PartialEq)]
72pub enum SingleNodeRoutingInfo {
73 Random,
75 SpecificNode(Route),
77 ByAddress {
79 host: String,
81 port: u16,
83 },
84}
85
86impl From<Option<Route>> for SingleNodeRoutingInfo {
87 fn from(value: Option<Route>) -> Self {
88 value
89 .map(SingleNodeRoutingInfo::SpecificNode)
90 .unwrap_or(SingleNodeRoutingInfo::Random)
91 }
92}
93
94#[derive(Debug, Clone, PartialEq)]
96pub enum MultipleNodeRoutingInfo {
97 AllNodes,
99 AllMasters,
101 MultiSlot(Vec<(Route, Vec<usize>)>),
103}
104
105pub fn command_for_multi_slot_indices<'a, 'b>(
110 original_cmd: &'a impl Routable,
111 indices: impl Iterator<Item = &'b usize> + 'a,
112) -> Cmd
113where
114 'b: 'a,
115{
116 let mut new_cmd = Cmd::new();
117 let command_length = 1; new_cmd.arg(original_cmd.arg_idx(0));
119 for index in indices {
120 new_cmd.arg(original_cmd.arg_idx(index + command_length));
121 }
122 new_cmd
123}
124
125pub(crate) fn aggregate(values: Vec<Value>, op: AggregateOp) -> RedisResult<Value> {
126 let initial_value = match op {
127 AggregateOp::Min => i64::MAX,
128 AggregateOp::Sum => 0,
129 };
130 let result = values.into_iter().try_fold(initial_value, |acc, curr| {
131 let int = match curr {
132 Value::Int(int) => int,
133 _ => {
134 return RedisResult::Err(
135 (
136 ErrorKind::TypeError,
137 "expected array of integers as response",
138 )
139 .into(),
140 );
141 }
142 };
143 let acc = match op {
144 AggregateOp::Min => min(acc, int),
145 AggregateOp::Sum => acc + int,
146 };
147 Ok(acc)
148 })?;
149 Ok(Value::Int(result))
150}
151
152pub(crate) fn logical_aggregate(values: Vec<Value>, op: LogicalAggregateOp) -> RedisResult<Value> {
153 let initial_value = match op {
154 LogicalAggregateOp::And => true,
155 };
156 let results = values.into_iter().try_fold(Vec::new(), |acc, curr| {
157 let values = match curr {
158 Value::Array(values) => values,
159 _ => {
160 return RedisResult::Err(
161 (
162 ErrorKind::TypeError,
163 "expected array of integers as response",
164 )
165 .into(),
166 );
167 }
168 };
169 let mut acc = if acc.is_empty() {
170 vec![initial_value; values.len()]
171 } else {
172 acc
173 };
174 for (index, value) in values.into_iter().enumerate() {
175 let int = match value {
176 Value::Int(int) => int,
177 _ => {
178 return Err((
179 ErrorKind::TypeError,
180 "expected array of integers as response",
181 )
182 .into());
183 }
184 };
185 acc[index] = match op {
186 LogicalAggregateOp::And => acc[index] && (int > 0),
187 };
188 }
189 Ok(acc)
190 })?;
191 Ok(Value::Array(
192 results
193 .into_iter()
194 .map(|result| Value::Int(result as i64))
195 .collect(),
196 ))
197}
198
199pub(crate) fn combine_array_results(values: Vec<Value>) -> RedisResult<Value> {
200 let mut results = Vec::new();
201
202 for value in values {
203 match value {
204 Value::Array(values) => results.extend(values),
205 _ => {
206 return Err((ErrorKind::TypeError, "expected array of values as response").into());
207 }
208 }
209 }
210
211 Ok(Value::Array(results))
212}
213
214pub(crate) fn combine_and_sort_array_results<'a>(
219 values: Vec<Value>,
220 sorting_order: impl ExactSizeIterator<Item = &'a Vec<usize>>,
221) -> RedisResult<Value> {
222 let mut results = Vec::new();
223 results.resize(
224 values.iter().fold(0, |acc, value| match value {
225 Value::Array(values) => values.len() + acc,
226 _ => 0,
227 }),
228 Value::Nil,
229 );
230 assert_eq!(values.len(), sorting_order.len());
231
232 for (key_indices, value) in sorting_order.into_iter().zip(values) {
233 match value {
234 Value::Array(values) => {
235 assert_eq!(values.len(), key_indices.len());
236 for (index, value) in key_indices.iter().zip(values) {
237 results[*index] = value;
238 }
239 }
240 _ => {
241 return Err((ErrorKind::TypeError, "expected array of values as response").into());
242 }
243 }
244 }
245
246 Ok(Value::Array(results))
247}
248
249pub fn get_slot(key: &[u8]) -> u16 {
251 let key = match get_hashtag(key) {
252 Some(tag) => tag,
253 None => key,
254 };
255
256 slot(key)
257}
258
259fn get_route(is_readonly: bool, key: &[u8]) -> Route {
260 let slot = get_slot(key);
261 if is_readonly {
262 Route::new(slot, SlotAddr::ReplicaOptional)
263 } else {
264 Route::new(slot, SlotAddr::Master)
265 }
266}
267
268fn multi_shard<R>(
279 routable: &R,
280 cmd: &[u8],
281 first_key_index: usize,
282 has_values: bool,
283) -> Option<RoutingInfo>
284where
285 R: Routable + ?Sized,
286{
287 let is_readonly = is_readonly_cmd(cmd);
288 let mut routes = HashMap::new();
289 let mut key_index = 0;
290 while let Some(key) = routable.arg_idx(first_key_index + key_index) {
291 let route = get_route(is_readonly, key);
292 let entry = routes.entry(route);
293 let keys = entry.or_insert(Vec::new());
294 keys.push(key_index);
295
296 if has_values {
297 key_index += 1;
298 routable.arg_idx(first_key_index + key_index)?; keys.push(key_index);
300 }
301 key_index += 1;
302 }
303
304 let mut routes: Vec<(Route, Vec<usize>)> = routes.into_iter().collect();
305 Some(if routes.len() == 1 {
306 RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(routes.pop().unwrap().0))
307 } else {
308 RoutingInfo::MultiNode((
309 MultipleNodeRoutingInfo::MultiSlot(routes),
310 ResponsePolicy::for_command(cmd),
311 ))
312 })
313}
314
315impl ResponsePolicy {
316 pub fn for_command(cmd: &[u8]) -> Option<ResponsePolicy> {
318 match cmd {
319 b"SCRIPT EXISTS" => Some(ResponsePolicy::AggregateLogical(LogicalAggregateOp::And)),
320
321 b"DBSIZE" | b"DEL" | b"EXISTS" | b"SLOWLOG LEN" | b"TOUCH" | b"UNLINK"
322 | b"LATENCY RESET" => Some(ResponsePolicy::Aggregate(AggregateOp::Sum)),
323
324 b"WAIT" => Some(ResponsePolicy::Aggregate(AggregateOp::Min)),
325
326 b"ACL SETUSER" | b"ACL DELUSER" | b"ACL SAVE" | b"CLIENT SETNAME"
327 | b"CLIENT SETINFO" | b"CONFIG SET" | b"CONFIG RESETSTAT" | b"CONFIG REWRITE"
328 | b"FLUSHALL" | b"FLUSHDB" | b"FUNCTION DELETE" | b"FUNCTION FLUSH"
329 | b"FUNCTION LOAD" | b"FUNCTION RESTORE" | b"MEMORY PURGE" | b"MSET" | b"PING"
330 | b"SCRIPT FLUSH" | b"SCRIPT LOAD" | b"SLOWLOG RESET" => {
331 Some(ResponsePolicy::AllSucceeded)
332 }
333
334 b"KEYS" | b"MGET" | b"SLOWLOG GET" => Some(ResponsePolicy::CombineArrays),
335
336 b"FUNCTION KILL" | b"SCRIPT KILL" => Some(ResponsePolicy::OneSucceeded),
337
338 b"RANDOMKEY" => Some(ResponsePolicy::OneSucceededNonEmpty),
340
341 b"LATENCY GRAPH" | b"LATENCY HISTOGRAM" | b"LATENCY HISTORY" | b"LATENCY DOCTOR"
342 | b"LATENCY LATEST" => Some(ResponsePolicy::Special),
343
344 b"FUNCTION STATS" => Some(ResponsePolicy::Special),
345
346 b"MEMORY MALLOC-STATS" | b"MEMORY DOCTOR" | b"MEMORY STATS" => {
347 Some(ResponsePolicy::Special)
348 }
349
350 b"INFO" => Some(ResponsePolicy::Special),
351
352 _ => None,
353 }
354 }
355}
356
357impl RoutingInfo {
358 pub fn for_routable<R>(r: &R) -> Option<RoutingInfo>
360 where
361 R: Routable + ?Sized,
362 {
363 let cmd = &r.command()?[..];
364 match cmd {
365 b"RANDOMKEY"
366 | b"KEYS"
367 | b"SCRIPT EXISTS"
368 | b"WAIT"
369 | b"DBSIZE"
370 | b"FLUSHALL"
371 | b"FUNCTION RESTORE"
372 | b"FUNCTION DELETE"
373 | b"FUNCTION FLUSH"
374 | b"FUNCTION LOAD"
375 | b"PING"
376 | b"FLUSHDB"
377 | b"MEMORY PURGE"
378 | b"FUNCTION KILL"
379 | b"SCRIPT KILL"
380 | b"FUNCTION STATS"
381 | b"MEMORY MALLOC-STATS"
382 | b"MEMORY DOCTOR"
383 | b"MEMORY STATS"
384 | b"INFO" => Some(RoutingInfo::MultiNode((
385 MultipleNodeRoutingInfo::AllMasters,
386 ResponsePolicy::for_command(cmd),
387 ))),
388
389 b"ACL SETUSER" | b"ACL DELUSER" | b"ACL SAVE" | b"CLIENT SETNAME"
390 | b"CLIENT SETINFO" | b"SLOWLOG GET" | b"SLOWLOG LEN" | b"SLOWLOG RESET"
391 | b"CONFIG SET" | b"CONFIG RESETSTAT" | b"CONFIG REWRITE" | b"SCRIPT FLUSH"
392 | b"SCRIPT LOAD" | b"LATENCY RESET" | b"LATENCY GRAPH" | b"LATENCY HISTOGRAM"
393 | b"LATENCY HISTORY" | b"LATENCY DOCTOR" | b"LATENCY LATEST" => {
394 Some(RoutingInfo::MultiNode((
395 MultipleNodeRoutingInfo::AllNodes,
396 ResponsePolicy::for_command(cmd),
397 )))
398 }
399
400 b"ACL DRYRUN"
403 | b"ACL GENPASS"
404 | b"ACL GETUSER"
405 | b"ACL HELP"
406 | b"ACL LIST"
407 | b"ACL LOG"
408 | b"ACL USERS"
409 | b"ACL WHOAMI"
410 | b"AUTH"
411 | b"TIME"
412 | b"PUBSUB CHANNELS"
413 | b"PUBSUB NUMPAT"
414 | b"PUBSUB SHARDCHANNELS"
415 | b"BGSAVE"
416 | b"WAITAOF"
417 | b"SAVE"
418 | b"LASTSAVE"
419 | b"CLIENT TRACKINGINFO"
420 | b"CLIENT PAUSE"
421 | b"CLIENT UNPAUSE"
422 | b"CLIENT UNBLOCK"
423 | b"CLIENT ID"
424 | b"CLIENT REPLY"
425 | b"CLIENT GETNAME"
426 | b"CLIENT GETREDIR"
427 | b"CLIENT INFO"
428 | b"CLIENT KILL"
429 | b"CLUSTER INFO"
430 | b"CLUSTER MEET"
431 | b"CLUSTER MYSHARDID"
432 | b"CLUSTER NODES"
433 | b"CLUSTER REPLICAS"
434 | b"CLUSTER RESET"
435 | b"CLUSTER SET-CONFIG-EPOCH"
436 | b"CLUSTER SLOTS"
437 | b"CLUSTER SHARDS"
438 | b"CLUSTER COUNT-FAILURE-REPORTS"
439 | b"CLUSTER KEYSLOT"
440 | b"COMMAND"
441 | b"COMMAND COUNT"
442 | b"COMMAND LIST"
443 | b"COMMAND GETKEYS"
444 | b"CONFIG GET"
445 | b"DEBUG"
446 | b"ECHO"
447 | b"READONLY"
448 | b"READWRITE"
449 | b"TFUNCTION LOAD"
450 | b"TFUNCTION DELETE"
451 | b"TFUNCTION LIST"
452 | b"TFCALL"
453 | b"TFCALLASYNC"
454 | b"MODULE LIST"
455 | b"MODULE LOAD"
456 | b"MODULE UNLOAD"
457 | b"MODULE LOADEX" => Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)),
458
459 b"CLUSTER COUNTKEYSINSLOT"
460 | b"CLUSTER GETKEYSINSLOT"
461 | b"CLUSTER SETSLOT"
462 | b"CLUSTER DELSLOTS"
463 | b"CLUSTER DELSLOTSRANGE" => r
464 .arg_idx(2)
465 .and_then(|arg| std::str::from_utf8(arg).ok())
466 .and_then(|slot| slot.parse::<u16>().ok())
467 .map(|slot| {
468 RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route::new(
469 slot,
470 SlotAddr::Master,
471 )))
472 }),
473
474 b"MGET" | b"DEL" | b"EXISTS" | b"UNLINK" | b"TOUCH" => multi_shard(r, cmd, 1, false),
475 b"MSET" => multi_shard(r, cmd, 1, true),
476 b"SCAN" | b"SHUTDOWN" | b"SLAVEOF" | b"REPLICAOF" | b"MOVE" | b"BITOP" => None,
478 b"EVALSHA" | b"EVAL" => {
479 let key_count = r
480 .arg_idx(2)
481 .and_then(|x| std::str::from_utf8(x).ok())
482 .and_then(|x| x.parse::<u64>().ok())?;
483 if key_count == 0 {
484 Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
485 } else {
486 r.arg_idx(3).map(|key| RoutingInfo::for_key(cmd, key))
487 }
488 }
489 b"XGROUP CREATE"
490 | b"XGROUP CREATECONSUMER"
491 | b"XGROUP DELCONSUMER"
492 | b"XGROUP DESTROY"
493 | b"XGROUP SETID"
494 | b"XINFO CONSUMERS"
495 | b"XINFO GROUPS"
496 | b"XINFO STREAM"
497 | b"PUBSUB SHARDNUMSUB"
498 | b"PUBSUB NUMSUB" => r.arg_idx(2).map(|key| RoutingInfo::for_key(cmd, key)),
499 b"XREAD" | b"XREADGROUP" => {
500 let streams_position = r.position(b"STREAMS")?;
501 r.arg_idx(streams_position + 1)
502 .map(|key| RoutingInfo::for_key(cmd, key))
503 }
504 _ => match r.arg_idx(1) {
505 Some(key) => Some(RoutingInfo::for_key(cmd, key)),
506 None => Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)),
507 },
508 }
509 }
510
511 fn for_key(cmd: &[u8], key: &[u8]) -> RoutingInfo {
512 RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(get_route(
513 is_readonly_cmd(cmd),
514 key,
515 )))
516 }
517}
518
519pub trait Routable {
521 fn command(&self) -> Option<Vec<u8>> {
524 let primary_command = self.arg_idx(0).map(|x| x.to_ascii_uppercase())?;
525 let mut primary_command = match primary_command.as_slice() {
526 b"XGROUP" | b"OBJECT" | b"SLOWLOG" | b"FUNCTION" | b"MODULE" | b"COMMAND"
527 | b"PUBSUB" | b"CONFIG" | b"MEMORY" | b"XINFO" | b"CLIENT" | b"ACL" | b"SCRIPT"
528 | b"CLUSTER" | b"LATENCY" => primary_command,
529 _ => {
530 return Some(primary_command);
531 }
532 };
533
534 Some(match self.arg_idx(1) {
535 Some(secondary_command) => {
536 let previous_len = primary_command.len();
537 primary_command.reserve(secondary_command.len() + 1);
538 primary_command.extend(b" ");
539 primary_command.extend(secondary_command);
540 let current_len = primary_command.len();
541 primary_command[previous_len + 1..current_len].make_ascii_uppercase();
542 primary_command
543 }
544 None => primary_command,
545 })
546 }
547
548 fn arg_idx(&self, idx: usize) -> Option<&[u8]>;
550
551 fn position(&self, candidate: &[u8]) -> Option<usize>;
553}
554
555impl Routable for Cmd {
556 fn arg_idx(&self, idx: usize) -> Option<&[u8]> {
557 self.arg_idx(idx)
558 }
559
560 fn position(&self, candidate: &[u8]) -> Option<usize> {
561 self.args_iter().position(|a| match a {
562 Arg::Simple(d) => d.eq_ignore_ascii_case(candidate),
563 _ => false,
564 })
565 }
566}
567
568impl Routable for Value {
569 fn arg_idx(&self, idx: usize) -> Option<&[u8]> {
570 match self {
571 Value::Array(args) => match args.get(idx) {
572 Some(Value::BulkString(ref data)) => Some(&data[..]),
573 _ => None,
574 },
575 _ => None,
576 }
577 }
578
579 fn position(&self, candidate: &[u8]) -> Option<usize> {
580 match self {
581 Value::Array(args) => args.iter().position(|a| match a {
582 Value::BulkString(d) => d.eq_ignore_ascii_case(candidate),
583 _ => false,
584 }),
585 _ => None,
586 }
587 }
588}
589
590#[derive(Debug, PartialEq)]
591pub(crate) struct Slot {
592 pub(crate) start: u16,
593 pub(crate) end: u16,
594 pub(crate) master: String,
595 pub(crate) replicas: Vec<String>,
596}
597
598impl Slot {
599 pub fn new(s: u16, e: u16, m: String, r: Vec<String>) -> Self {
600 Self {
601 start: s,
602 end: e,
603 master: m,
604 replicas: r,
605 }
606 }
607}
608
609#[derive(Eq, PartialEq, Clone, Copy, Debug, Hash)]
611pub enum SlotAddr {
612 Master,
614 ReplicaOptional,
617 ReplicaRequired,
620}
621
622#[derive(Debug)]
627pub(crate) struct SlotAddrs {
628 primary: String,
629 replicas: Vec<String>,
630}
631
632impl SlotAddrs {
633 pub(crate) fn new(primary: String, replicas: Vec<String>) -> Self {
634 Self { primary, replicas }
635 }
636
637 fn get_replica_node(&self) -> &str {
638 self.replicas.choose(&mut rng()).unwrap_or(&self.primary)
639 }
640
641 pub(crate) fn slot_addr(&self, slot_addr: &SlotAddr, read_from_replica: bool) -> &str {
642 match slot_addr {
643 SlotAddr::Master => &self.primary,
644 SlotAddr::ReplicaOptional => {
645 if read_from_replica {
646 self.get_replica_node()
647 } else {
648 &self.primary
649 }
650 }
651 SlotAddr::ReplicaRequired => self.get_replica_node(),
652 }
653 }
654
655 pub(crate) fn from_slot(slot: Slot) -> Self {
656 SlotAddrs::new(slot.master, slot.replicas)
657 }
658}
659
660impl<'a> IntoIterator for &'a SlotAddrs {
661 type Item = &'a String;
662 type IntoIter = std::iter::Chain<std::iter::Once<&'a String>, std::slice::Iter<'a, String>>;
663
664 fn into_iter(
665 self,
666 ) -> std::iter::Chain<std::iter::Once<&'a String>, std::slice::Iter<'a, String>> {
667 std::iter::once(&self.primary).chain(self.replicas.iter())
668 }
669}
670
671#[derive(Debug)]
672struct SlotMapValue {
673 start: u16,
674 addrs: SlotAddrs,
675}
676
677impl SlotMapValue {
678 fn from_slot(slot: Slot) -> Self {
679 Self {
680 start: slot.start,
681 addrs: SlotAddrs::from_slot(slot),
682 }
683 }
684}
685
686#[derive(Debug, Default)]
687pub(crate) struct SlotMap {
688 slots: BTreeMap<u16, SlotMapValue>,
689 read_from_replica: bool,
690}
691
692impl SlotMap {
693 pub fn new(read_from_replica: bool) -> Self {
694 Self {
695 slots: Default::default(),
696 read_from_replica,
697 }
698 }
699
700 pub fn from_slots(slots: Vec<Slot>, read_from_replica: bool) -> Self {
701 Self {
702 slots: slots
703 .into_iter()
704 .map(|slot| (slot.end, SlotMapValue::from_slot(slot)))
705 .collect(),
706 read_from_replica,
707 }
708 }
709
710 #[cfg(feature = "cluster-async")]
711 pub fn fill_slots(&mut self, slots: Vec<Slot>) {
712 for slot in slots {
713 self.slots.insert(slot.end, SlotMapValue::from_slot(slot));
714 }
715 }
716
717 pub fn slot_addr_for_route(&self, route: &Route) -> Option<&str> {
718 let slot = route.slot();
719 self.slots
720 .range(slot..)
721 .next()
722 .and_then(|(end, slot_value)| {
723 if slot <= *end && slot_value.start <= slot {
724 Some(
725 slot_value
726 .addrs
727 .slot_addr(route.slot_addr(), self.read_from_replica),
728 )
729 } else {
730 None
731 }
732 })
733 }
734
735 #[cfg(feature = "cluster-async")]
736 pub fn clear(&mut self) {
737 self.slots.clear();
738 }
739
740 pub fn values(&self) -> impl Iterator<Item = &SlotAddrs> {
741 self.slots.values().map(|slot_value| &slot_value.addrs)
742 }
743
744 fn all_unique_addresses(&self, only_primaries: bool) -> HashSet<&str> {
745 let mut addresses: HashSet<&str> = HashSet::new();
746 if only_primaries {
747 addresses.extend(
748 self.values().map(|slot_addrs| {
749 slot_addrs.slot_addr(&SlotAddr::Master, self.read_from_replica)
750 }),
751 );
752 } else {
753 addresses.extend(
754 self.values()
755 .flat_map(|slot_addrs| slot_addrs.into_iter())
756 .map(|str| str.as_str()),
757 );
758 }
759
760 addresses
761 }
762
763 pub fn addresses_for_all_primaries(&self) -> HashSet<&str> {
764 self.all_unique_addresses(true)
765 }
766
767 pub fn addresses_for_all_nodes(&self) -> HashSet<&str> {
768 self.all_unique_addresses(false)
769 }
770
771 pub fn addresses_for_multi_slot<'a, 'b>(
772 &'a self,
773 routes: &'b [(Route, Vec<usize>)],
774 ) -> impl Iterator<Item = Option<&'a str>> + 'a
775 where
776 'b: 'a,
777 {
778 routes
779 .iter()
780 .map(|(route, _)| self.slot_addr_for_route(route))
781 }
782}
783
784#[derive(Eq, PartialEq, Clone, Copy, Debug, Hash)]
787pub struct Route(u16, SlotAddr);
788
789impl Route {
790 pub fn new(slot: u16, slot_addr: SlotAddr) -> Self {
792 Self(slot, slot_addr)
793 }
794
795 pub(crate) fn slot(&self) -> u16 {
796 self.0
797 }
798
799 pub(crate) fn slot_addr(&self) -> &SlotAddr {
800 &self.1
801 }
802}
803
804fn get_hashtag(key: &[u8]) -> Option<&[u8]> {
805 let open = key.iter().position(|v| *v == b'{')?;
806
807 let close = key[open..].iter().position(|v| *v == b'}')?;
808
809 let rv = &key[open + 1..open + close];
810 (!rv.is_empty()).then_some(rv)
811}
812
813#[cfg(test)]
814mod tests {
815 use core::panic;
816 use std::collections::HashSet;
817
818 use super::{
819 command_for_multi_slot_indices, get_hashtag, slot, MultipleNodeRoutingInfo, Route,
820 RoutingInfo, SingleNodeRoutingInfo, Slot, SlotAddr, SlotMap,
821 };
822 use crate::{
823 cluster_routing::{AggregateOp, ResponsePolicy},
824 cmd,
825 parser::parse_redis_value,
826 Value,
827 };
828
829 #[test]
830 fn test_get_hashtag() {
831 assert_eq!(get_hashtag(&b"foo{bar}baz"[..]), Some(&b"bar"[..]));
832 assert_eq!(get_hashtag(&b"foo{}{baz}"[..]), None);
833 assert_eq!(get_hashtag(&b"foo{{bar}}zap"[..]), Some(&b"{bar"[..]));
834 }
835
836 #[test]
837 fn test_routing_info_mixed_capatalization() {
838 let mut upper = cmd("XREAD");
839 upper.arg("STREAMS").arg("foo").arg(0);
840
841 let mut lower = cmd("xread");
842 lower.arg("streams").arg("foo").arg(0);
843
844 assert_eq!(
845 RoutingInfo::for_routable(&upper).unwrap(),
846 RoutingInfo::for_routable(&lower).unwrap()
847 );
848
849 let mut mixed = cmd("xReAd");
850 mixed.arg("StReAmS").arg("foo").arg(0);
851
852 assert_eq!(
853 RoutingInfo::for_routable(&lower).unwrap(),
854 RoutingInfo::for_routable(&mixed).unwrap()
855 );
856 }
857
858 #[test]
859 fn test_routing_info() {
860 let mut test_cmds = vec![];
861
862 let mut test_cmd = cmd("FLUSHALL");
864 test_cmd.arg("");
865 test_cmds.push(test_cmd);
866
867 test_cmd = cmd("ECHO");
869 test_cmd.arg("");
870 test_cmds.push(test_cmd);
871
872 test_cmd = cmd("SET");
874 test_cmd.arg("42");
875 test_cmds.push(test_cmd);
876
877 test_cmd = cmd("XINFO");
879 test_cmd.arg("GROUPS").arg("FOOBAR");
880 test_cmds.push(test_cmd);
881
882 test_cmd = cmd("EVAL");
884 test_cmd.arg("FOO").arg("0").arg("BAR");
885 test_cmds.push(test_cmd);
886
887 test_cmd = cmd("EVAL");
889 test_cmd.arg("FOO").arg("4").arg("BAR");
890 test_cmds.push(test_cmd);
891
892 test_cmd = cmd("XREAD");
894 test_cmd.arg("STREAMS").arg("4");
895 test_cmds.push(test_cmd);
896
897 test_cmd = cmd("XREAD");
899 test_cmd.arg("FOO").arg("STREAMS").arg("4");
900 test_cmds.push(test_cmd);
901
902 for cmd in test_cmds {
903 let value = parse_redis_value(&cmd.get_packed_command()).unwrap();
904 assert_eq!(
905 RoutingInfo::for_routable(&value).unwrap(),
906 RoutingInfo::for_routable(&cmd).unwrap(),
907 );
908 }
909
910 for cmd in [cmd("FLUSHALL"), cmd("FLUSHDB"), cmd("PING")] {
913 assert_eq!(
914 RoutingInfo::for_routable(&cmd),
915 Some(RoutingInfo::MultiNode((
916 MultipleNodeRoutingInfo::AllMasters,
917 Some(ResponsePolicy::AllSucceeded)
918 )))
919 );
920 }
921
922 assert_eq!(
923 RoutingInfo::for_routable(&cmd("DBSIZE")),
924 Some(RoutingInfo::MultiNode((
925 MultipleNodeRoutingInfo::AllMasters,
926 Some(ResponsePolicy::Aggregate(AggregateOp::Sum))
927 )))
928 );
929
930 assert_eq!(
931 RoutingInfo::for_routable(&cmd("SCRIPT KILL")),
932 Some(RoutingInfo::MultiNode((
933 MultipleNodeRoutingInfo::AllMasters,
934 Some(ResponsePolicy::OneSucceeded)
935 )))
936 );
937
938 assert_eq!(
939 RoutingInfo::for_routable(&cmd("INFO")),
940 Some(RoutingInfo::MultiNode((
941 MultipleNodeRoutingInfo::AllMasters,
942 Some(ResponsePolicy::Special)
943 )))
944 );
945
946 assert_eq!(
947 RoutingInfo::for_routable(&cmd("KEYS")),
948 Some(RoutingInfo::MultiNode((
949 MultipleNodeRoutingInfo::AllMasters,
950 Some(ResponsePolicy::CombineArrays)
951 )))
952 );
953
954 for cmd in vec![
955 cmd("SCAN"),
956 cmd("SHUTDOWN"),
957 cmd("SLAVEOF"),
958 cmd("REPLICAOF"),
959 cmd("MOVE"),
960 cmd("BITOP"),
961 ] {
962 assert_eq!(
963 RoutingInfo::for_routable(&cmd),
964 None,
965 "{}",
966 std::str::from_utf8(cmd.arg_idx(0).unwrap()).unwrap()
967 );
968 }
969
970 for cmd in [
971 cmd("EVAL").arg(r#"redis.call("PING");"#).arg(0),
972 cmd("EVALSHA").arg(r#"redis.call("PING");"#).arg(0),
973 ] {
974 assert_eq!(
975 RoutingInfo::for_routable(cmd),
976 Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
977 );
978 }
979
980 for (cmd, expected) in [
981 (
982 cmd("EVAL")
983 .arg(r#"redis.call("GET, KEYS[1]");"#)
984 .arg(1)
985 .arg("foo"),
986 Some(RoutingInfo::SingleNode(
987 SingleNodeRoutingInfo::SpecificNode(Route::new(slot(b"foo"), SlotAddr::Master)),
988 )),
989 ),
990 (
991 cmd("XGROUP")
992 .arg("CREATE")
993 .arg("mystream")
994 .arg("workers")
995 .arg("$")
996 .arg("MKSTREAM"),
997 Some(RoutingInfo::SingleNode(
998 SingleNodeRoutingInfo::SpecificNode(Route::new(
999 slot(b"mystream"),
1000 SlotAddr::Master,
1001 )),
1002 )),
1003 ),
1004 (
1005 cmd("XINFO").arg("GROUPS").arg("foo"),
1006 Some(RoutingInfo::SingleNode(
1007 SingleNodeRoutingInfo::SpecificNode(Route::new(
1008 slot(b"foo"),
1009 SlotAddr::ReplicaOptional,
1010 )),
1011 )),
1012 ),
1013 (
1014 cmd("XREADGROUP")
1015 .arg("GROUP")
1016 .arg("wkrs")
1017 .arg("consmrs")
1018 .arg("STREAMS")
1019 .arg("mystream"),
1020 Some(RoutingInfo::SingleNode(
1021 SingleNodeRoutingInfo::SpecificNode(Route::new(
1022 slot(b"mystream"),
1023 SlotAddr::Master,
1024 )),
1025 )),
1026 ),
1027 (
1028 cmd("XREAD")
1029 .arg("COUNT")
1030 .arg("2")
1031 .arg("STREAMS")
1032 .arg("mystream")
1033 .arg("writers")
1034 .arg("0-0")
1035 .arg("0-0"),
1036 Some(RoutingInfo::SingleNode(
1037 SingleNodeRoutingInfo::SpecificNode(Route::new(
1038 slot(b"mystream"),
1039 SlotAddr::ReplicaOptional,
1040 )),
1041 )),
1042 ),
1043 ] {
1044 assert_eq!(
1045 RoutingInfo::for_routable(cmd),
1046 expected,
1047 "{}",
1048 std::str::from_utf8(cmd.arg_idx(0).unwrap()).unwrap()
1049 );
1050 }
1051 }
1052
1053 #[test]
1054 fn test_slot_for_packed_cmd() {
1055 assert!(matches!(RoutingInfo::for_routable(&parse_redis_value(&[
1056 42, 50, 13, 10, 36, 54, 13, 10, 69, 88, 73, 83, 84, 83, 13, 10, 36, 49, 54, 13, 10,
1057 244, 93, 23, 40, 126, 127, 253, 33, 89, 47, 185, 204, 171, 249, 96, 139, 13, 10
1058 ]).unwrap()), Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route(slot, SlotAddr::ReplicaOptional)))) if slot == 964));
1059
1060 assert!(matches!(RoutingInfo::for_routable(&parse_redis_value(&[
1061 42, 54, 13, 10, 36, 51, 13, 10, 83, 69, 84, 13, 10, 36, 49, 54, 13, 10, 36, 241,
1062 197, 111, 180, 254, 5, 175, 143, 146, 171, 39, 172, 23, 164, 145, 13, 10, 36, 52,
1063 13, 10, 116, 114, 117, 101, 13, 10, 36, 50, 13, 10, 78, 88, 13, 10, 36, 50, 13, 10,
1064 80, 88, 13, 10, 36, 55, 13, 10, 49, 56, 48, 48, 48, 48, 48, 13, 10
1065 ]).unwrap()), Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route(slot, SlotAddr::Master)))) if slot == 8352));
1066
1067 assert!(matches!(RoutingInfo::for_routable(&parse_redis_value(&[
1068 42, 54, 13, 10, 36, 51, 13, 10, 83, 69, 84, 13, 10, 36, 49, 54, 13, 10, 169, 233,
1069 247, 59, 50, 247, 100, 232, 123, 140, 2, 101, 125, 221, 66, 170, 13, 10, 36, 52,
1070 13, 10, 116, 114, 117, 101, 13, 10, 36, 50, 13, 10, 78, 88, 13, 10, 36, 50, 13, 10,
1071 80, 88, 13, 10, 36, 55, 13, 10, 49, 56, 48, 48, 48, 48, 48, 13, 10
1072 ]).unwrap()), Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route(slot, SlotAddr::Master)))) if slot == 5210));
1073 }
1074
1075 #[test]
1076 fn test_multi_shard() {
1077 let mut cmd = cmd("DEL");
1078 cmd.arg("foo").arg("bar").arg("baz").arg("{bar}vaz");
1079 let routing = RoutingInfo::for_routable(&cmd);
1080 let mut expected = std::collections::HashMap::new();
1081 expected.insert(Route(4813, SlotAddr::Master), vec![2]);
1082 expected.insert(Route(5061, SlotAddr::Master), vec![1, 3]);
1083 expected.insert(Route(12182, SlotAddr::Master), vec![0]);
1084
1085 assert!(
1086 matches!(routing.clone(), Some(RoutingInfo::MultiNode((MultipleNodeRoutingInfo::MultiSlot(vec), Some(ResponsePolicy::Aggregate(AggregateOp::Sum))))) if {
1087 let routes = vec.clone().into_iter().collect();
1088 expected == routes
1089 }),
1090 "{routing:?}"
1091 );
1092
1093 let mut cmd = crate::cmd("MGET");
1094 cmd.arg("foo").arg("bar").arg("baz").arg("{bar}vaz");
1095 let routing = RoutingInfo::for_routable(&cmd);
1096 let mut expected = std::collections::HashMap::new();
1097 expected.insert(Route(4813, SlotAddr::ReplicaOptional), vec![2]);
1098 expected.insert(Route(5061, SlotAddr::ReplicaOptional), vec![1, 3]);
1099 expected.insert(Route(12182, SlotAddr::ReplicaOptional), vec![0]);
1100
1101 assert!(
1102 matches!(routing.clone(), Some(RoutingInfo::MultiNode((MultipleNodeRoutingInfo::MultiSlot(vec), Some(ResponsePolicy::CombineArrays)))) if {
1103 let routes = vec.clone().into_iter().collect();
1104 expected ==routes
1105 }),
1106 "{routing:?}"
1107 );
1108 }
1109
1110 #[test]
1111 fn test_command_creation_for_multi_shard() {
1112 let mut original_cmd = cmd("DEL");
1113 original_cmd
1114 .arg("foo")
1115 .arg("bar")
1116 .arg("baz")
1117 .arg("{bar}vaz");
1118 let routing = RoutingInfo::for_routable(&original_cmd);
1119 let expected = [vec![0], vec![1, 3], vec![2]];
1120
1121 let mut indices: Vec<_> = match routing {
1122 Some(RoutingInfo::MultiNode((MultipleNodeRoutingInfo::MultiSlot(vec), _))) => {
1123 vec.into_iter().map(|(_, indices)| indices).collect()
1124 }
1125 _ => panic!("unexpected routing: {routing:?}"),
1126 };
1127 indices.sort_by(|prev, next| prev.iter().next().unwrap().cmp(next.iter().next().unwrap())); for (index, indices) in indices.into_iter().enumerate() {
1130 let cmd = command_for_multi_slot_indices(&original_cmd, indices.iter());
1131 let expected_indices = &expected[index];
1132 assert_eq!(original_cmd.arg_idx(0), cmd.arg_idx(0));
1133 for (index, target_index) in expected_indices.iter().enumerate() {
1134 let target_index = target_index + 1;
1135 assert_eq!(original_cmd.arg_idx(target_index), cmd.arg_idx(index + 1));
1136 }
1137 }
1138 }
1139
1140 #[test]
1141 fn test_combine_multi_shard_to_single_node_when_all_keys_are_in_same_slot() {
1142 let mut cmd = cmd("DEL");
1143 cmd.arg("foo").arg("{foo}bar").arg("{foo}baz");
1144 let routing = RoutingInfo::for_routable(&cmd);
1145
1146 assert!(
1147 matches!(
1148 routing,
1149 Some(RoutingInfo::SingleNode(
1150 SingleNodeRoutingInfo::SpecificNode(Route(12182, SlotAddr::Master))
1151 ))
1152 ),
1153 "{routing:?}"
1154 );
1155 }
1156
1157 #[test]
1158 fn test_slot_map() {
1159 let slot_map = SlotMap::from_slots(
1160 vec![
1161 Slot {
1162 start: 1,
1163 end: 1000,
1164 master: "node1:6379".to_owned(),
1165 replicas: vec!["replica1:6379".to_owned()],
1166 },
1167 Slot {
1168 start: 1001,
1169 end: 2000,
1170 master: "node2:6379".to_owned(),
1171 replicas: vec!["replica2:6379".to_owned()],
1172 },
1173 ],
1174 true,
1175 );
1176
1177 assert_eq!(
1178 "node1:6379",
1179 slot_map
1180 .slot_addr_for_route(&Route::new(1, SlotAddr::Master))
1181 .unwrap()
1182 );
1183 assert_eq!(
1184 "node1:6379",
1185 slot_map
1186 .slot_addr_for_route(&Route::new(500, SlotAddr::Master))
1187 .unwrap()
1188 );
1189 assert_eq!(
1190 "node1:6379",
1191 slot_map
1192 .slot_addr_for_route(&Route::new(1000, SlotAddr::Master))
1193 .unwrap()
1194 );
1195 assert_eq!(
1196 "replica1:6379",
1197 slot_map
1198 .slot_addr_for_route(&Route::new(1000, SlotAddr::ReplicaOptional))
1199 .unwrap()
1200 );
1201 assert_eq!(
1202 "node2:6379",
1203 slot_map
1204 .slot_addr_for_route(&Route::new(1001, SlotAddr::Master))
1205 .unwrap()
1206 );
1207 assert_eq!(
1208 "node2:6379",
1209 slot_map
1210 .slot_addr_for_route(&Route::new(1500, SlotAddr::Master))
1211 .unwrap()
1212 );
1213 assert_eq!(
1214 "node2:6379",
1215 slot_map
1216 .slot_addr_for_route(&Route::new(2000, SlotAddr::Master))
1217 .unwrap()
1218 );
1219 assert!(slot_map
1220 .slot_addr_for_route(&Route::new(2001, SlotAddr::Master))
1221 .is_none());
1222 }
1223
1224 #[test]
1225 fn test_slot_map_when_read_from_replica_is_false() {
1226 let slot_map = SlotMap::from_slots(
1227 vec![Slot {
1228 start: 1,
1229 end: 1000,
1230 master: "node1:6379".to_owned(),
1231 replicas: vec!["replica1:6379".to_owned()],
1232 }],
1233 false,
1234 );
1235
1236 assert_eq!(
1237 "node1:6379",
1238 slot_map
1239 .slot_addr_for_route(&Route::new(1000, SlotAddr::ReplicaOptional))
1240 .unwrap()
1241 );
1242 assert_eq!(
1243 "replica1:6379",
1244 slot_map
1245 .slot_addr_for_route(&Route::new(1000, SlotAddr::ReplicaRequired))
1246 .unwrap()
1247 );
1248 }
1249
1250 #[test]
1251 fn test_combining_results_into_single_array() {
1252 let res1 = Value::Array(vec![Value::Nil, Value::Okay]);
1253 let res2 = Value::Array(vec![
1254 Value::BulkString("1".as_bytes().to_vec()),
1255 Value::BulkString("4".as_bytes().to_vec()),
1256 ]);
1257 let res3 = Value::Array(vec![Value::SimpleString("2".to_string()), Value::Int(3)]);
1258 let results = super::combine_and_sort_array_results(
1259 vec![res1, res2, res3],
1260 [vec![0, 5], vec![1, 4], vec![2, 3]].iter(),
1261 );
1262
1263 assert_eq!(
1264 results.unwrap(),
1265 Value::Array(vec![
1266 Value::Nil,
1267 Value::BulkString("1".as_bytes().to_vec()),
1268 Value::SimpleString("2".to_string()),
1269 Value::Int(3),
1270 Value::BulkString("4".as_bytes().to_vec()),
1271 Value::Okay,
1272 ])
1273 );
1274 }
1275
1276 fn get_slot_map(read_from_replica: bool) -> SlotMap {
1277 SlotMap::from_slots(
1278 vec![
1279 Slot::new(
1280 1,
1281 1000,
1282 "node1:6379".to_owned(),
1283 vec!["replica1:6379".to_owned()],
1284 ),
1285 Slot::new(
1286 1002,
1287 2000,
1288 "node2:6379".to_owned(),
1289 vec!["replica2:6379".to_owned(), "replica3:6379".to_owned()],
1290 ),
1291 Slot::new(
1292 2001,
1293 3000,
1294 "node3:6379".to_owned(),
1295 vec![
1296 "replica4:6379".to_owned(),
1297 "replica5:6379".to_owned(),
1298 "replica6:6379".to_owned(),
1299 ],
1300 ),
1301 Slot::new(
1302 3001,
1303 4000,
1304 "node2:6379".to_owned(),
1305 vec!["replica2:6379".to_owned(), "replica3:6379".to_owned()],
1306 ),
1307 ],
1308 read_from_replica,
1309 )
1310 }
1311
1312 #[test]
1313 fn test_slot_map_get_all_primaries() {
1314 let slot_map = get_slot_map(false);
1315 let addresses = slot_map.addresses_for_all_primaries();
1316 assert_eq!(
1317 addresses,
1318 HashSet::from_iter(["node1:6379", "node2:6379", "node3:6379"])
1319 );
1320 }
1321
1322 #[test]
1323 fn test_slot_map_get_all_nodes() {
1324 let slot_map = get_slot_map(false);
1325 let addresses = slot_map.addresses_for_all_nodes();
1326 assert_eq!(
1327 addresses,
1328 HashSet::from_iter([
1329 "node1:6379",
1330 "node2:6379",
1331 "node3:6379",
1332 "replica1:6379",
1333 "replica2:6379",
1334 "replica3:6379",
1335 "replica4:6379",
1336 "replica5:6379",
1337 "replica6:6379"
1338 ])
1339 );
1340 }
1341
1342 #[test]
1343 fn test_slot_map_get_multi_node() {
1344 let slot_map = get_slot_map(true);
1345 let routes = vec![
1346 (Route::new(1, SlotAddr::Master), vec![]),
1347 (Route::new(2001, SlotAddr::ReplicaOptional), vec![]),
1348 ];
1349 let addresses = slot_map
1350 .addresses_for_multi_slot(&routes)
1351 .collect::<Vec<_>>();
1352 assert!(addresses.contains(&Some("node1:6379")));
1353 assert!(
1354 addresses.contains(&Some("replica4:6379"))
1355 || addresses.contains(&Some("replica5:6379"))
1356 || addresses.contains(&Some("replica6:6379"))
1357 );
1358 }
1359
1360 #[test]
1361 fn test_slot_map_should_ignore_replicas_in_multi_slot_if_read_from_replica_is_false() {
1362 let slot_map = get_slot_map(false);
1363 let routes = vec![
1364 (Route::new(1, SlotAddr::Master), vec![]),
1365 (Route::new(2001, SlotAddr::ReplicaOptional), vec![]),
1366 ];
1367 let addresses = slot_map
1368 .addresses_for_multi_slot(&routes)
1369 .collect::<Vec<_>>();
1370 assert_eq!(addresses, vec![Some("node1:6379"), Some("node3:6379")]);
1371 }
1372
1373 #[test]
1376 fn test_slot_map_get_repeating_addresses_when_the_same_node_is_found_in_multi_slot() {
1377 let slot_map = get_slot_map(true);
1378 let routes = vec![
1379 (Route::new(1, SlotAddr::ReplicaOptional), vec![]),
1380 (Route::new(2001, SlotAddr::Master), vec![]),
1381 (Route::new(2, SlotAddr::ReplicaOptional), vec![]),
1382 (Route::new(2002, SlotAddr::Master), vec![]),
1383 (Route::new(3, SlotAddr::ReplicaOptional), vec![]),
1384 (Route::new(2003, SlotAddr::Master), vec![]),
1385 ];
1386 let addresses = slot_map
1387 .addresses_for_multi_slot(&routes)
1388 .collect::<Vec<_>>();
1389 assert_eq!(
1390 addresses,
1391 vec![
1392 Some("replica1:6379"),
1393 Some("node3:6379"),
1394 Some("replica1:6379"),
1395 Some("node3:6379"),
1396 Some("replica1:6379"),
1397 Some("node3:6379")
1398 ]
1399 );
1400 }
1401
1402 #[test]
1403 fn test_slot_map_get_none_when_slot_is_missing_from_multi_slot() {
1404 let slot_map = get_slot_map(true);
1405 let routes = vec![
1406 (Route::new(1, SlotAddr::ReplicaOptional), vec![]),
1407 (Route::new(5000, SlotAddr::Master), vec![]),
1408 (Route::new(6000, SlotAddr::ReplicaOptional), vec![]),
1409 (Route::new(2002, SlotAddr::Master), vec![]),
1410 ];
1411 let addresses = slot_map
1412 .addresses_for_multi_slot(&routes)
1413 .collect::<Vec<_>>();
1414 assert_eq!(
1415 addresses,
1416 vec![Some("replica1:6379"), None, None, Some("node3:6379")]
1417 );
1418 }
1419}