1use std::cell::RefCell;
66use std::collections::HashSet;
67use std::thread;
68use std::time::Duration;
69
70mod pipeline;
71
72pub use super::client::{ClusterClient, ClusterClientBuilder};
73use super::topology::parse_slots;
74use super::{
75 client::ClusterParams,
76 routing::{Redirect, Route, RoutingInfo},
77 slot_map::{SlotMap, SLOT_SIZE},
78};
79use crate::cluster_handling::{get_connection_info, slot_cmd, split_node_address};
80use crate::cluster_routing::{
81 MultipleNodeRoutingInfo, ResponsePolicy, Routable, SingleNodeRoutingInfo, SlotAddr,
82};
83use crate::cmd::{cmd, Cmd};
84use crate::connection::{connect, Connection, ConnectionInfo, ConnectionLike};
85use crate::errors::{ErrorKind, RedisError, RetryMethod};
86use crate::parser::parse_redis_value;
87use crate::types::{HashMap, RedisResult, Value};
88use crate::IntoConnectionInfo;
89pub use crate::TlsMode; use arcstr::ArcStr;
91use pipeline::UNROUTABLE_ERROR;
92use rand::{rng, seq::IteratorRandom, Rng};
93
94pub use pipeline::{cluster_pipe, ClusterPipeline};
95
96#[derive(Clone)]
97enum Input<'a> {
98 Slice {
99 cmd: &'a [u8],
100 routable: Value,
101 },
102 Cmd(&'a Cmd),
103 Commands {
104 cmd: &'a [u8],
105 offset: usize,
106 count: usize,
107 },
108}
109
110impl<'a> Input<'a> {
111 fn send(&'a self, connection: &mut impl ConnectionLike) -> RedisResult<Output> {
112 match self {
113 Input::Slice { cmd, routable: _ } => connection
114 .req_packed_command(cmd)
115 .and_then(|value| value.extract_error())
116 .map(Output::Single),
117 Input::Cmd(cmd) => connection
118 .req_command(cmd)
119 .and_then(|value| value.extract_error())
120 .map(Output::Single),
121 Input::Commands { cmd, offset, count } => connection
122 .req_packed_commands(cmd, *offset, *count)
123 .and_then(Value::extract_error_vec)
124 .map(Output::Multi),
125 }
126 }
127}
128
129impl Routable for Input<'_> {
130 fn arg_idx(&self, idx: usize) -> Option<&[u8]> {
131 match self {
132 Input::Slice { cmd: _, routable } => routable.arg_idx(idx),
133 Input::Cmd(cmd) => cmd.arg_idx(idx),
134 Input::Commands { .. } => None,
135 }
136 }
137
138 fn position(&self, candidate: &[u8]) -> Option<usize> {
139 match self {
140 Input::Slice { cmd: _, routable } => routable.position(candidate),
141 Input::Cmd(cmd) => cmd.position(candidate),
142 Input::Commands { .. } => None,
143 }
144 }
145}
146
147enum Output {
148 Single(Value),
149 Multi(Vec<Value>),
150}
151
152impl From<Output> for Value {
153 fn from(value: Output) -> Self {
154 match value {
155 Output::Single(value) => value,
156 Output::Multi(values) => Value::Array(values),
157 }
158 }
159}
160
161impl From<Output> for Vec<Value> {
162 fn from(value: Output) -> Self {
163 match value {
164 Output::Single(value) => vec![value],
165 Output::Multi(values) => values,
166 }
167 }
168}
169
170pub trait Connect: Sized {
173 fn connect<T>(info: T, timeout: Option<Duration>) -> RedisResult<Self>
175 where
176 T: IntoConnectionInfo;
177
178 fn send_packed_command(&mut self, cmd: &[u8]) -> RedisResult<()>;
183
184 fn set_write_timeout(&self, dur: Option<Duration>) -> RedisResult<()>;
190
191 fn set_read_timeout(&self, dur: Option<Duration>) -> RedisResult<()>;
197
198 fn recv_response(&mut self) -> RedisResult<Value>;
201}
202
203impl Connect for Connection {
204 fn connect<T>(info: T, timeout: Option<Duration>) -> RedisResult<Self>
205 where
206 T: IntoConnectionInfo,
207 {
208 connect(&info.into_connection_info()?, timeout)
209 }
210
211 fn send_packed_command(&mut self, cmd: &[u8]) -> RedisResult<()> {
212 Self::send_packed_command(self, cmd)
213 }
214
215 fn set_write_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
216 Self::set_write_timeout(self, dur)
217 }
218
219 fn set_read_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
220 Self::set_read_timeout(self, dur)
221 }
222
223 fn recv_response(&mut self) -> RedisResult<Value> {
224 Self::recv_response(self)
225 }
226}
227
228#[derive(Clone, Default)]
230pub struct ClusterConfig {
231 pub(crate) connection_timeout: Option<Duration>,
232 pub(crate) response_timeout: Option<Duration>,
233 #[cfg(feature = "cluster-async")]
234 pub(crate) async_push_sender: Option<std::sync::Arc<dyn crate::aio::AsyncPushSender>>,
235 #[cfg(feature = "cluster-async")]
236 pub(crate) async_dns_resolver: Option<std::sync::Arc<dyn crate::io::AsyncDNSResolver>>,
237}
238
239impl ClusterConfig {
240 pub fn new() -> Self {
242 Self::default()
243 }
244
245 pub fn set_connection_timeout(mut self, connection_timeout: std::time::Duration) -> Self {
247 self.connection_timeout = Some(connection_timeout);
248 self
249 }
250
251 pub fn set_response_timeout(mut self, response_timeout: std::time::Duration) -> Self {
253 self.response_timeout = Some(response_timeout);
254 self
255 }
256
257 #[cfg(feature = "cluster-async")]
258 pub fn set_push_sender(mut self, sender: impl crate::aio::AsyncPushSender) -> Self {
283 self.async_push_sender = Some(std::sync::Arc::new(sender));
284 self
285 }
286
287 #[cfg(feature = "cluster-async")]
291 pub fn set_dns_resolver(mut self, resolver: impl crate::io::AsyncDNSResolver) -> Self {
292 self.async_dns_resolver = Some(std::sync::Arc::new(resolver));
293 self
294 }
295}
296
297pub struct ClusterConnection<C = Connection> {
302 initial_nodes: Vec<ConnectionInfo>,
303 connections: RefCell<HashMap<ArcStr, C>>,
304 slots: RefCell<SlotMap>,
305 auto_reconnect: RefCell<bool>,
306 read_timeout: RefCell<Option<Duration>>,
307 write_timeout: RefCell<Option<Duration>>,
308 cluster_params: ClusterParams,
309}
310
311impl<C> ClusterConnection<C>
312where
313 C: ConnectionLike + Connect,
314{
315 pub(crate) fn new(
316 cluster_params: ClusterParams,
317 initial_nodes: Vec<ConnectionInfo>,
318 ) -> RedisResult<Self> {
319 let connection = Self {
320 connections: RefCell::new(HashMap::new()),
321 slots: RefCell::new(SlotMap::new(cluster_params.read_from_replicas)),
322 auto_reconnect: RefCell::new(true),
323 read_timeout: RefCell::new(cluster_params.response_timeout),
324 write_timeout: RefCell::new(None),
325 initial_nodes: initial_nodes.to_vec(),
326 cluster_params,
327 };
328 connection.create_initial_connections()?;
329
330 Ok(connection)
331 }
332
333 pub fn set_auto_reconnect(&self, value: bool) {
336 let mut auto_reconnect = self.auto_reconnect.borrow_mut();
337 *auto_reconnect = value;
338 }
339
340 pub fn set_write_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
346 if dur.is_some() && dur.unwrap().is_zero() {
348 return Err(RedisError::from((
349 ErrorKind::InvalidClientConfig,
350 "Duration should be None or non-zero.",
351 )));
352 }
353
354 let mut t = self.write_timeout.borrow_mut();
355 *t = dur;
356 let connections = self.connections.borrow();
357 for conn in connections.values() {
358 conn.set_write_timeout(dur)?;
359 }
360 Ok(())
361 }
362
363 pub fn set_read_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
369 if dur.is_some() && dur.unwrap().is_zero() {
371 return Err(RedisError::from((
372 ErrorKind::InvalidClientConfig,
373 "Duration should be None or non-zero.",
374 )));
375 }
376
377 let mut t = self.read_timeout.borrow_mut();
378 *t = dur;
379 let connections = self.connections.borrow();
380 for conn in connections.values() {
381 conn.set_read_timeout(dur)?;
382 }
383 Ok(())
384 }
385
386 #[doc(hidden)]
388 pub fn check_connection(&mut self) -> bool {
389 <Self as ConnectionLike>::check_connection(self)
390 }
391
392 pub(crate) fn execute_pipeline(&mut self, pipe: &ClusterPipeline) -> RedisResult<Vec<Value>> {
393 self.send_recv_and_retry_cmds(pipe.commands())
394 }
395
396 fn create_initial_connections(&self) -> RedisResult<()> {
404 let mut connections = HashMap::with_capacity(self.initial_nodes.len());
405 let mut failed_connections = Vec::new();
406
407 for info in self.initial_nodes.iter() {
408 let addr = info.addr.to_string().into();
409
410 match self.connect(&addr) {
411 Ok(mut conn) => {
412 if conn.check_connection() {
413 connections.insert(addr, conn);
414 break;
415 } else {
416 failed_connections.push((
417 addr,
418 RedisError::from((
419 ErrorKind::Io,
420 "Node failed to respond to connection check,",
421 )),
422 ));
423 }
424 }
425 Err(conn_err) => {
426 failed_connections.push((addr, conn_err));
427 }
428 }
429 }
430
431 if connections.is_empty() {
432 let detail = if failed_connections.is_empty() {
434 "List of initial nodes is empty".to_string()
435 } else {
436 let mut formatted_detail = "Failed to connect to each cluster node (".to_string();
437
438 for (index, (addr, conn_err)) in failed_connections.into_iter().enumerate() {
439 if index != 0 {
440 formatted_detail += "; ";
441 }
442 use std::fmt::Write;
443 let _ = write!(&mut formatted_detail, "{addr}: {conn_err}");
444 }
445 formatted_detail += ")";
446 formatted_detail
447 };
448
449 return Err(RedisError::from((
450 ErrorKind::Io,
451 "It failed to check startup nodes.",
452 detail,
453 )));
454 }
455
456 *self.connections.borrow_mut() = connections;
457 self.refresh_slots()?;
458 Ok(())
459 }
460
461 fn refresh_slots(&self) -> RedisResult<()> {
463 let mut slots = self.slots.borrow_mut();
464 *slots = self.create_new_slots()?;
465
466 let mut nodes = slots.values().flatten().collect::<Vec<_>>();
467 nodes.sort_unstable();
468 nodes.dedup();
469
470 let mut connections = self.connections.borrow_mut();
471 *connections = nodes
472 .into_iter()
473 .filter_map(|addr| {
474 if let Some(mut conn) = connections.remove(addr) {
475 if conn.check_connection() {
476 return Some((addr.clone(), conn));
477 }
478 }
479
480 if let Ok(mut conn) = self.connect(addr) {
481 if conn.check_connection() {
482 return Some((addr.clone(), conn));
483 }
484 }
485
486 None
487 })
488 .collect();
489
490 Ok(())
491 }
492
493 fn create_new_slots(&self) -> RedisResult<SlotMap> {
494 let mut connections = self.connections.borrow_mut();
495 let mut new_slots = None;
496
497 for (addr, conn) in connections.iter_mut() {
498 let value = conn.req_command(&slot_cmd())?;
499 if let Ok(slots_data) = parse_slots(value, addr.rsplit_once(':').unwrap().0) {
500 new_slots = Some(SlotMap::from_slots(
501 slots_data,
502 self.cluster_params.read_from_replicas,
503 ));
504 break;
505 }
506 }
507
508 match new_slots {
509 Some(new_slots) => Ok(new_slots),
510 None => Err(RedisError::from((
511 ErrorKind::Client,
512 "Slot refresh error. didn't get any slots from server",
513 ))),
514 }
515 }
516
517 fn connect(&self, node: &ArcStr) -> RedisResult<C> {
518 let info = get_connection_info(node, &self.cluster_params)?;
519
520 let mut conn = C::connect(info, Some(self.cluster_params.connection_timeout))?;
521 if self.cluster_params.read_from_replicas {
522 cmd("READONLY").exec(&mut conn)?;
524 }
525 conn.set_read_timeout(*self.read_timeout.borrow())?;
526 conn.set_write_timeout(*self.write_timeout.borrow())?;
527 Ok(conn)
528 }
529
530 fn get_connection<'a>(
531 &self,
532 connections: &'a mut HashMap<ArcStr, C>,
533 route: &Route,
534 ) -> (ArcStr, RedisResult<&'a mut C>) {
535 let slots = self.slots.borrow();
536 if let Some(addr) = slots.slot_addr_for_route(route) {
537 (addr.clone(), self.get_connection_by_addr(connections, addr))
538 } else {
539 get_random_connection_or_error(connections)
542 }
543 }
544
545 fn get_connection_by_addr<'a>(
546 &self,
547 connections: &'a mut HashMap<ArcStr, C>,
548 addr: &ArcStr,
549 ) -> RedisResult<&'a mut C> {
550 match connections.entry(addr.clone()) {
551 std::collections::hash_map::Entry::Occupied(occupied_entry) => {
552 Ok(occupied_entry.into_mut())
553 }
554 std::collections::hash_map::Entry::Vacant(vacant_entry) => {
555 let conn = self.connect(addr)?;
558 Ok(vacant_entry.insert(conn))
559 }
560 }
561 }
562
563 fn get_addr_for_cmd(&self, cmd: &Cmd) -> RedisResult<ArcStr> {
564 let slots = self.slots.borrow();
565
566 let addr_for_slot = |route: Route| -> RedisResult<ArcStr> {
567 let slot_addr = slots
568 .slot_addr_for_route(&route)
569 .ok_or((ErrorKind::Client, "Missing slot coverage"))?;
570 Ok(slot_addr.clone())
571 };
572
573 match RoutingInfo::for_routable(cmd) {
574 Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) => {
575 let mut rng = rng();
576 Ok(addr_for_slot(Route::new(
577 rng.random_range(0..SLOT_SIZE),
578 SlotAddr::Master,
579 ))?)
580 }
581 Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(route))) => {
582 Ok(addr_for_slot(route)?)
583 }
584 _ => fail!(UNROUTABLE_ERROR),
585 }
586 }
587
588 fn map_cmds_to_nodes(&self, cmds: &[Cmd]) -> RedisResult<Vec<NodeCmd>> {
589 let mut cmd_map: HashMap<ArcStr, NodeCmd> = HashMap::new();
590
591 for (idx, cmd) in cmds.iter().enumerate() {
592 let addr = self.get_addr_for_cmd(cmd)?;
593 let nc = cmd_map
594 .entry(addr.clone())
595 .or_insert_with(|| NodeCmd::new(addr));
596 nc.indexes.push(idx);
597 cmd.write_packed_command(&mut nc.pipe);
598 }
599
600 let mut result = Vec::new();
601 for (_, v) in cmd_map.drain() {
602 result.push(v);
603 }
604 Ok(result)
605 }
606
607 fn execute_on_all<'a>(
608 &'a self,
609 input: Input,
610 addresses: HashSet<&'a ArcStr>,
611 ) -> Vec<RedisResult<(&'a ArcStr, Value)>> {
612 addresses
613 .into_iter()
614 .map(|addr| {
615 let (host, port) = split_node_address(addr).unwrap();
616 self.request(
617 input.clone(),
618 Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress {
619 host: host.to_string(),
620 port,
621 })),
622 )
623 .map(|res| match res {
624 Output::Single(value) => (addr, value),
625 Output::Multi(values) => (addr, Value::Array(values)),
627 })
628 })
629 .collect()
630 }
631
632 fn execute_on_all_nodes<'a>(
633 &'a self,
634 input: Input,
635 slots: &'a mut SlotMap,
636 ) -> Vec<RedisResult<(&'a ArcStr, Value)>> {
637 self.execute_on_all(input, slots.addresses_for_all_nodes())
638 }
639
640 fn execute_on_all_primaries<'a>(
641 &'a self,
642 input: Input,
643 slots: &'a mut SlotMap,
644 ) -> Vec<RedisResult<(&'a ArcStr, Value)>> {
645 self.execute_on_all(input, slots.addresses_for_all_primaries())
646 }
647
648 fn execute_multi_slot<'a, 'b>(
649 &'a self,
650 input: Input,
651 slots: &'a mut SlotMap,
652 connections: &'a mut HashMap<ArcStr, C>,
653 routes: &'b [(Route, Vec<usize>)],
654 ) -> Vec<RedisResult<(&'a ArcStr, Value)>>
655 where
656 'b: 'a,
657 {
658 slots
659 .addresses_for_multi_slot(routes)
660 .enumerate()
661 .map(|(index, addr)| {
662 let addr = addr.ok_or(RedisError::from((
663 ErrorKind::Io,
664 "Couldn't find connection",
665 )))?;
666 let connection = self.get_connection_by_addr(connections, addr)?;
667 let (_, indices) = routes.get(index).unwrap();
668 let cmd =
669 crate::cluster_routing::command_for_multi_slot_indices(&input, indices.iter());
670 connection.req_command(&cmd).map(|res| (addr, res))
671 })
672 .collect()
673 }
674
675 fn execute_on_multiple_nodes(
676 &self,
677 input: Input,
678 routing: MultipleNodeRoutingInfo,
679 response_policy: Option<ResponsePolicy>,
680 ) -> RedisResult<Value> {
681 let mut connections = self.connections.borrow_mut();
682 let mut slots = self.slots.borrow_mut();
683
684 let results = match &routing {
685 MultipleNodeRoutingInfo::MultiSlot((routes, _)) => {
686 self.execute_multi_slot(input, &mut slots, &mut connections, routes)
687 }
688 MultipleNodeRoutingInfo::AllMasters => {
689 drop(connections);
690 self.execute_on_all_primaries(input, &mut slots)
691 }
692 MultipleNodeRoutingInfo::AllNodes => {
693 drop(connections);
694 self.execute_on_all_nodes(input, &mut slots)
695 }
696 };
697
698 match response_policy {
699 Some(ResponsePolicy::AllSucceeded) => {
700 let mut last_result = None;
701 for result in results {
702 last_result = Some(result?);
703 }
704
705 last_result
706 .ok_or(
707 (
708 ErrorKind::ClusterConnectionNotFound,
709 "No results received for multi-node operation",
710 )
711 .into(),
712 )
713 .map(|(_, res)| res)
714 }
715 Some(ResponsePolicy::OneSucceeded) => {
716 let mut last_failure = None;
717
718 for result in results {
719 match result {
720 Ok((_, val)) => return Ok(val),
721 Err(err) => last_failure = Some(err),
722 }
723 }
724
725 Err(last_failure
726 .unwrap_or_else(|| (ErrorKind::Io, "Couldn't find a connection").into()))
727 }
728 Some(ResponsePolicy::CombineMaps) => crate::cluster_routing::combine_map_results(
729 results
730 .into_iter()
731 .map(|result| result.map(|(_, value)| value))
732 .collect::<RedisResult<Vec<_>>>()?,
733 ),
734 Some(ResponsePolicy::FirstSucceededNonEmptyOrAllEmpty) => {
735 let mut last_failure = None;
741 let num_of_results = results.len();
742 let mut nil_counter = 0;
743 for result in results {
744 match result.map(|(_, res)| res) {
745 Ok(Value::Nil) => nil_counter += 1,
746 Ok(val) => return Ok(val),
747 Err(err) => last_failure = Some(err),
748 }
749 }
750 if nil_counter == num_of_results {
751 Ok(Value::Nil)
752 } else {
753 Err(last_failure
754 .unwrap_or_else(|| (ErrorKind::Io, "Couldn't find a connection").into()))
755 }
756 }
757 Some(ResponsePolicy::Aggregate(op)) => {
758 let results = results
759 .into_iter()
760 .map(|res| res.map(|(_, val)| val))
761 .collect::<RedisResult<Vec<_>>>()?;
762 crate::cluster_routing::aggregate(results, op)
763 }
764 Some(ResponsePolicy::AggregateLogical(op)) => {
765 let results = results
766 .into_iter()
767 .map(|res| res.map(|(_, val)| val))
768 .collect::<RedisResult<Vec<_>>>()?;
769 crate::cluster_routing::logical_aggregate(results, op)
770 }
771 Some(ResponsePolicy::CombineArrays) => {
772 let results = results
773 .into_iter()
774 .map(|res| res.map(|(_, val)| val))
775 .collect::<RedisResult<Vec<_>>>()?;
776 match routing {
777 MultipleNodeRoutingInfo::MultiSlot((vec, pattern)) => {
778 crate::cluster_routing::combine_and_sort_array_results(
779 results, &vec, &pattern,
780 )
781 }
782 _ => crate::cluster_routing::combine_array_results(results),
783 }
784 }
785 Some(ResponsePolicy::Special) | None => {
786 let results = results
789 .into_iter()
790 .map(|result| {
791 result.map(|(addr, val)| (Value::BulkString(addr.as_bytes().to_vec()), val))
792 })
793 .collect::<RedisResult<Vec<_>>>()?;
794 Ok(Value::Map(results))
795 }
796 }
797 }
798
799 #[allow(clippy::unnecessary_unwrap)]
800 fn request(&self, input: Input, route_option: Option<RoutingInfo>) -> RedisResult<Output> {
801 let single_node_routing = match route_option {
802 Some(RoutingInfo::SingleNode(single_node_routing)) => single_node_routing,
803 Some(RoutingInfo::MultiNode((multi_node_routing, response_policy))) => {
804 return self
805 .execute_on_multiple_nodes(input, multi_node_routing, response_policy)
806 .map(Output::Single);
807 }
808 None => fail!(UNROUTABLE_ERROR),
809 };
810
811 let mut retries = 0;
812 let mut redirected = None::<Redirect>;
813
814 loop {
815 let (addr, rv) = {
817 let mut connections = self.connections.borrow_mut();
818 let (addr, conn) = if let Some(redirected) = redirected.take() {
819 let (addr, is_asking) = match redirected {
820 Redirect::Moved(addr) => (addr, false),
821 Redirect::Ask(addr) => (addr, true),
822 };
823 let mut conn = self.get_connection_by_addr(&mut connections, &addr);
824 if is_asking {
825 conn = conn.and_then(|conn| {
829 conn.req_packed_command(&b"*1\r\n$6\r\nASKING\r\n"[..])
830 .and_then(|value| value.extract_error())?;
831 Ok(conn)
832 });
833 }
834 (addr, conn)
835 } else {
836 match &single_node_routing {
837 SingleNodeRoutingInfo::Random => {
838 get_random_connection_or_error(&mut connections)
839 }
840 SingleNodeRoutingInfo::SpecificNode(route) => {
841 self.get_connection(&mut connections, route)
842 }
843 SingleNodeRoutingInfo::ByAddress { host, port } => {
844 let address = format!("{host}:{port}").into();
845 let conn = self.get_connection_by_addr(&mut connections, &address);
846 (address, conn)
847 }
848 SingleNodeRoutingInfo::RandomPrimary => {
849 self.get_connection(&mut connections, &Route::new_random_primary())
850 }
851 }
852 };
853 (addr, conn.and_then(|conn| input.send(conn)))
854 };
855
856 match rv {
857 Ok(rv) => return Ok(rv),
858 Err(err) => {
859 if err.kind() == ErrorKind::ClusterConnectionNotFound
860 && *self.auto_reconnect.borrow()
861 {
862 for node in &self.initial_nodes {
863 let addr = node.addr.to_string().into();
864 if let Ok(mut conn) = self.connect(&addr) {
865 if conn.check_connection() {
866 self.connections.borrow_mut().insert(addr, conn);
867 }
868 }
869 }
870 self.refresh_slots()?;
871 }
872
873 if retries == self.cluster_params.retry_params.number_of_retries {
874 return Err(err);
875 }
876 retries += 1;
877
878 match err.retry_method() {
879 RetryMethod::AskRedirect => {
880 redirected = err
881 .redirect_node()
882 .map(|(node, _slot)| Redirect::Ask(node.into()));
883 }
884 RetryMethod::MovedRedirect => {
885 self.refresh_slots()?;
887 redirected = err
889 .redirect_node()
890 .map(|(node, _slot)| Redirect::Moved(node.into()));
891 }
892 RetryMethod::WaitAndRetry => {
893 let sleep_time = self
895 .cluster_params
896 .retry_params
897 .wait_time_for_retry(retries);
898 thread::sleep(sleep_time);
899 }
900 RetryMethod::Reconnect => {
901 if *self.auto_reconnect.borrow() {
902 self.connections.borrow_mut().remove(&addr);
904 if let Ok(mut conn) = self.connect(&addr) {
905 if conn.check_connection() {
906 self.connections.borrow_mut().insert(addr, conn);
907 }
908 }
909 }
910 }
911 RetryMethod::NoRetry => {
912 return Err(err);
913 }
914 RetryMethod::RetryImmediately => {}
915 RetryMethod::ReconnectFromInitialConnections => {
916 if *self.auto_reconnect.borrow() {
918 if let Ok(mut conn) = self.connect(&addr) {
919 if conn.check_connection() {
920 self.connections.borrow_mut().insert(addr, conn);
921 }
922 }
923 }
924 }
925 }
926 }
927 }
928 }
929 }
930
931 fn send_recv_and_retry_cmds(&self, cmds: &[Cmd]) -> RedisResult<Vec<Value>> {
932 let mut results = vec![Value::Nil; cmds.len()];
936
937 let to_retry = self
938 .send_all_commands(cmds)
939 .and_then(|node_cmds| self.recv_all_commands(&mut results, &node_cmds))?;
940
941 if to_retry.is_empty() {
942 return Ok(results);
943 }
944
945 self.refresh_slots()?;
947
948 for retry_idx in to_retry {
952 let cmd = &cmds[retry_idx];
953 let routing = RoutingInfo::for_routable(cmd);
954 results[retry_idx] = self.request(Input::Cmd(cmd), routing)?.into();
955 }
956 Ok(results)
957 }
958
959 fn send_all_commands(&self, cmds: &[Cmd]) -> RedisResult<Vec<NodeCmd>> {
961 let mut connections = self.connections.borrow_mut();
962
963 let node_cmds = self.map_cmds_to_nodes(cmds)?;
964 for nc in &node_cmds {
965 self.get_connection_by_addr(&mut connections, &nc.addr)?
966 .send_packed_command(&nc.pipe)?;
967 }
968 Ok(node_cmds)
969 }
970
971 fn recv_all_commands(
973 &self,
974 results: &mut [Value],
975 node_cmds: &[NodeCmd],
976 ) -> RedisResult<Vec<usize>> {
977 let mut to_retry = Vec::new();
978 let mut connections = self.connections.borrow_mut();
979 let mut first_err = None;
980
981 for nc in node_cmds {
982 for cmd_idx in &nc.indexes {
983 match self
984 .get_connection_by_addr(&mut connections, &nc.addr)?
985 .recv_response()
986 {
987 Ok(item) => results[*cmd_idx] = item,
988 Err(err) if err.is_cluster_error() => to_retry.push(*cmd_idx),
989 Err(err) => first_err = first_err.or(Some(err)),
990 }
991 }
992 }
993 match first_err {
994 Some(err) => Err(err),
995 None => Ok(to_retry),
996 }
997 }
998
999 pub fn route_command(&mut self, cmd: &Cmd, routing: RoutingInfo) -> RedisResult<Value> {
1001 self.request(Input::Cmd(cmd), Some(routing))
1002 .map(|res| res.into())
1003 }
1004}
1005
1006const MULTI: &[u8] = "*1\r\n$5\r\nMULTI\r\n".as_bytes();
1007impl<C: Connect + ConnectionLike> ConnectionLike for ClusterConnection<C> {
1008 fn supports_pipelining(&self) -> bool {
1009 false
1010 }
1011
1012 fn req_command(&mut self, cmd: &Cmd) -> RedisResult<Value> {
1013 if cmd.is_empty() {
1014 return Err(RedisError::make_empty_command());
1015 }
1016 let routing = RoutingInfo::for_routable(cmd);
1017 self.request(Input::Cmd(cmd), routing).map(|res| res.into())
1018 }
1019
1020 fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
1021 if cmd.is_empty() {
1022 return Err(RedisError::make_empty_command());
1023 }
1024 let actual_cmd = if cmd.starts_with(MULTI) {
1025 &cmd[MULTI.len()..]
1026 } else {
1027 cmd
1028 };
1029 let value = parse_redis_value(actual_cmd)?;
1030 let routing = RoutingInfo::for_routable(&value);
1031 self.request(
1032 Input::Slice {
1033 cmd,
1034 routable: value,
1035 },
1036 routing,
1037 )
1038 .map(|res| res.into())
1039 }
1040
1041 fn req_packed_commands(
1042 &mut self,
1043 cmd: &[u8],
1044 offset: usize,
1045 count: usize,
1046 ) -> RedisResult<Vec<Value>> {
1047 if cmd.is_empty() {
1048 return Err(RedisError::make_empty_command());
1049 }
1050 let actual_cmd = if cmd.starts_with(MULTI) {
1051 &cmd[MULTI.len()..]
1052 } else {
1053 cmd
1054 };
1055 let value = parse_redis_value(actual_cmd)?;
1056 let route = match RoutingInfo::for_routable(&value) {
1057 Some(RoutingInfo::MultiNode(_)) => None,
1059 Some(RoutingInfo::SingleNode(route)) => Some(route),
1060 None => None,
1061 }
1062 .unwrap_or(SingleNodeRoutingInfo::Random);
1063 self.request(
1064 Input::Commands { cmd, offset, count },
1065 Some(RoutingInfo::SingleNode(route)),
1066 )
1067 .map(|res| res.into())
1068 }
1069
1070 fn get_db(&self) -> i64 {
1071 0
1072 }
1073
1074 fn is_open(&self) -> bool {
1075 let connections = self.connections.borrow();
1076 for conn in connections.values() {
1077 if !conn.is_open() {
1078 return false;
1079 }
1080 }
1081 true
1082 }
1083
1084 fn check_connection(&mut self) -> bool {
1085 let mut connections = self.connections.borrow_mut();
1086 for conn in connections.values_mut() {
1087 if !conn.check_connection() {
1088 return false;
1089 }
1090 }
1091 true
1092 }
1093}
1094
1095#[derive(Debug)]
1096struct NodeCmd {
1097 indexes: Vec<usize>,
1099 pipe: Vec<u8>,
1100 addr: ArcStr,
1101}
1102
1103impl NodeCmd {
1104 fn new(a: ArcStr) -> NodeCmd {
1105 NodeCmd {
1106 indexes: vec![],
1107 pipe: vec![],
1108 addr: a,
1109 }
1110 }
1111}
1112
1113fn get_random_connection<C: ConnectionLike + Connect + Sized>(
1114 connections: &mut HashMap<ArcStr, C>,
1115) -> Option<(ArcStr, &mut C)> {
1116 connections
1117 .iter_mut()
1118 .choose(&mut rng())
1119 .map(|(addr, conn)| (addr.clone(), conn))
1120}
1121
1122fn get_random_connection_or_error<C: ConnectionLike + Connect + Sized>(
1123 connections: &mut HashMap<ArcStr, C>,
1124) -> (ArcStr, RedisResult<&mut C>) {
1125 match get_random_connection(connections) {
1126 Some((addr, conn)) => (addr, Ok(conn)),
1127 None => (
1128 String::new().into(),
1130 Err(RedisError::from((
1131 ErrorKind::ClusterConnectionNotFound,
1132 "No connections found",
1133 ))),
1134 ),
1135 }
1136}
1137
1138#[cfg(test)]
1139mod tests {
1140 use crate::ConnectionAddr;
1141
1142 use super::*;
1143
1144 #[test]
1145 fn parse_cluster_node_host_port() {
1146 let cases = vec![
1147 (
1148 "127.0.0.1:6379",
1149 ConnectionAddr::Tcp("127.0.0.1".to_string(), 6379u16),
1150 ),
1151 (
1152 "localhost.localdomain:6379",
1153 ConnectionAddr::Tcp("localhost.localdomain".to_string(), 6379u16),
1154 ),
1155 (
1156 "dead::cafe:beef:30001",
1157 ConnectionAddr::Tcp("dead::cafe:beef".to_string(), 30001u16),
1158 ),
1159 (
1160 "[fe80::cafe:beef%en1]:30001",
1161 ConnectionAddr::Tcp("fe80::cafe:beef%en1".to_string(), 30001u16),
1162 ),
1163 ];
1164
1165 for (input, expected) in cases {
1166 let res = get_connection_info(input, &ClusterParams::default());
1167 assert_eq!(res.unwrap().addr, expected);
1168 }
1169
1170 let cases = vec![":0", "[]:6379"];
1171 for input in cases {
1172 let res = get_connection_info(input, &ClusterParams::default());
1173 assert_eq!(
1174 res.err(),
1175 Some(RedisError::from((
1176 ErrorKind::InvalidClientConfig,
1177 "Invalid node string",
1178 ))),
1179 );
1180 }
1181 }
1182}