1use std::{
94 collections::HashMap,
95 fmt,
96 future::Future,
97 io, mem,
98 pin::Pin,
99 sync::{Arc, Mutex},
100 task::{self, Poll},
101 time::Duration,
102};
103
104mod request;
105mod routing;
106use crate::{
107 aio::{check_resp3, ConnectionLike, HandleContainer, MultiplexedConnection, Runtime},
108 cluster::{get_connection_info, slot_cmd},
109 cluster_client::ClusterParams,
110 cluster_routing::{
111 MultipleNodeRoutingInfo, Redirect, ResponsePolicy, RoutingInfo, SingleNodeRoutingInfo,
112 Slot, SlotMap,
113 },
114 cluster_topology::parse_slots,
115 cmd,
116 subscription_tracker::SubscriptionTracker,
117 types::closed_connection_error,
118 AsyncConnectionConfig, Cmd, ConnectionInfo, ErrorKind, IntoConnectionInfo, RedisError,
119 RedisFuture, RedisResult, ToRedisArgs, Value,
120};
121
122use crate::ProtocolVersion;
123use futures_sink::Sink;
124use futures_util::{
125 future::{self, BoxFuture, FutureExt},
126 ready,
127 stream::{self, Stream, StreamExt},
128};
129use log::{debug, trace, warn};
130use rand::{rng, seq::IteratorRandom};
131use request::{CmdArg, PendingRequest, Request, RequestState, Retry};
132use routing::{route_for_pipeline, InternalRoutingInfo, InternalSingleNodeRouting};
133use tokio::sync::{mpsc, oneshot, RwLock};
134
135struct ClientSideState {
136 protocol: ProtocolVersion,
137 _task_handle: HandleContainer,
138 response_timeout: Option<Duration>,
139 runtime: Runtime,
140}
141
142#[derive(Clone)]
147pub struct ClusterConnection<C = MultiplexedConnection> {
148 state: Arc<ClientSideState>,
149 sender: mpsc::Sender<Message<C>>,
150}
151
152impl<C> ClusterConnection<C>
153where
154 C: ConnectionLike + Connect + Clone + Send + Sync + Unpin + 'static,
155{
156 pub(crate) async fn new(
157 initial_nodes: &[ConnectionInfo],
158 cluster_params: ClusterParams,
159 ) -> RedisResult<ClusterConnection<C>> {
160 let protocol = cluster_params.protocol.unwrap_or_default();
161 let response_timeout = cluster_params.response_timeout;
162 let runtime = Runtime::locate();
163 ClusterConnInner::new(initial_nodes, cluster_params)
164 .await
165 .map(|inner| {
166 let (sender, mut receiver) = mpsc::channel::<Message<_>>(100);
167 let stream = async move {
168 let _ = stream::poll_fn(move |cx| receiver.poll_recv(cx))
169 .map(Ok)
170 .forward(inner)
171 .await;
172 };
173 let _task_handle = HandleContainer::new(runtime.spawn(stream));
174
175 ClusterConnection {
176 sender,
177 state: Arc::new(ClientSideState {
178 protocol,
179 _task_handle,
180 response_timeout,
181 runtime,
182 }),
183 }
184 })
185 }
186
187 pub async fn route_command(&mut self, cmd: &Cmd, routing: RoutingInfo) -> RedisResult<Value> {
189 trace!("send_packed_command");
190 let (sender, receiver) = oneshot::channel();
191 let request = async {
192 self.sender
193 .send(Message {
194 cmd: CmdArg::Cmd {
195 cmd: Arc::new(cmd.clone()), routing: routing.into(),
197 },
198 sender,
199 })
200 .await
201 .map_err(|_| {
202 RedisError::from(io::Error::new(
203 io::ErrorKind::BrokenPipe,
204 "redis_cluster: Unable to send command",
205 ))
206 })?;
207
208 receiver
209 .await
210 .unwrap_or_else(|_| {
211 Err(RedisError::from(io::Error::new(
212 io::ErrorKind::BrokenPipe,
213 "redis_cluster: Unable to receive command",
214 )))
215 })
216 .map(|response| match response {
217 Response::Single(value) => value,
218 Response::Multiple(_) => unreachable!(),
219 })
220 };
221
222 match self.state.response_timeout {
223 Some(duration) => self.state.runtime.timeout(duration, request).await?,
224 None => request.await,
225 }
226 }
227
228 pub async fn route_pipeline<'a>(
230 &'a mut self,
231 pipeline: &'a crate::Pipeline,
232 offset: usize,
233 count: usize,
234 route: SingleNodeRoutingInfo,
235 ) -> RedisResult<Vec<Value>> {
236 let (sender, receiver) = oneshot::channel();
237
238 let request = async {
239 self.sender
240 .send(Message {
241 cmd: CmdArg::Pipeline {
242 pipeline: Arc::new(pipeline.clone()), offset,
244 count,
245 route: route.into(),
246 },
247 sender,
248 })
249 .await
250 .map_err(|_| closed_connection_error())?;
251 receiver
252 .await
253 .unwrap_or_else(|_| Err(closed_connection_error()))
254 .map(|response| match response {
255 Response::Multiple(values) => values,
256 Response::Single(_) => unreachable!(),
257 })
258 };
259
260 match self.state.response_timeout {
261 Some(duration) => self.state.runtime.timeout(duration, request).await?,
262 None => request.await,
263 }
264 }
265
266 pub async fn subscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> {
275 check_resp3!(self.state.protocol);
276 let mut cmd = cmd("SUBSCRIBE");
277 cmd.arg(channel_name);
278 cmd.exec_async(self).await?;
279 Ok(())
280 }
281
282 pub async fn unsubscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> {
286 check_resp3!(self.state.protocol);
287 let mut cmd = cmd("UNSUBSCRIBE");
288 cmd.arg(channel_name);
289 cmd.exec_async(self).await?;
290 Ok(())
291 }
292
293 pub async fn psubscribe(&mut self, channel_pattern: impl ToRedisArgs) -> RedisResult<()> {
302 check_resp3!(self.state.protocol);
303 let mut cmd = cmd("PSUBSCRIBE");
304 cmd.arg(channel_pattern);
305 cmd.exec_async(self).await?;
306 Ok(())
307 }
308
309 pub async fn punsubscribe(&mut self, channel_pattern: impl ToRedisArgs) -> RedisResult<()> {
313 check_resp3!(self.state.protocol);
314 let mut cmd = cmd("PUNSUBSCRIBE");
315 cmd.arg(channel_pattern);
316 cmd.exec_async(self).await?;
317 Ok(())
318 }
319
320 pub async fn ssubscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> {
329 check_resp3!(self.state.protocol);
330 let mut cmd = cmd("SSUBSCRIBE");
331 cmd.arg(channel_name);
332 cmd.exec_async(self).await?;
333 Ok(())
334 }
335
336 pub async fn sunsubscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> {
340 check_resp3!(self.state.protocol);
341 let mut cmd = cmd("SUNSUBSCRIBE");
342 cmd.arg(channel_name);
343 cmd.exec_async(self).await?;
344 Ok(())
345 }
346}
347
348type ConnectionFuture<C> = future::Shared<BoxFuture<'static, C>>;
349type ConnectionMap<C> = HashMap<String, ConnectionFuture<C>>;
350
351struct InnerCore<C> {
355 conn_lock: RwLock<(ConnectionMap<C>, SlotMap)>,
356 cluster_params: ClusterParams,
357 pending_requests: Mutex<Vec<PendingRequest<C>>>,
358 initial_nodes: Vec<ConnectionInfo>,
359 subscription_tracker: Option<Mutex<SubscriptionTracker>>,
360}
361
362type Core<C> = Arc<InnerCore<C>>;
363
364struct ClusterConnInner<C> {
368 inner: Core<C>,
369 state: ConnectionState,
370 #[allow(clippy::complexity)]
371 in_flight_requests: stream::FuturesUnordered<Pin<Box<Request<C>>>>,
372 refresh_error: Option<RedisError>,
373}
374
375fn boxed_sleep(duration: Duration) -> BoxFuture<'static, ()> {
376 Box::pin(Runtime::locate_and_sleep(duration))
377}
378
379#[derive(Debug, PartialEq)]
380pub(crate) enum Response {
381 Single(Value),
382 Multiple(Vec<Value>),
383}
384
385enum OperationTarget {
386 Node { address: String },
387 NotFound,
388 FanOut,
389}
390type OperationResult = Result<Response, (OperationTarget, RedisError)>;
391
392impl From<String> for OperationTarget {
393 fn from(address: String) -> Self {
394 OperationTarget::Node { address }
395 }
396}
397
398struct Message<C> {
399 cmd: CmdArg<C>,
400 sender: oneshot::Sender<RedisResult<Response>>,
401}
402
403enum RecoverFuture {
404 RecoverSlots(BoxFuture<'static, RedisResult<()>>),
405 Reconnect(BoxFuture<'static, ()>),
406}
407
408enum ConnectionState {
409 PollComplete,
410 Recover(RecoverFuture),
411}
412
413impl fmt::Debug for ConnectionState {
414 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
415 write!(
416 f,
417 "{}",
418 match self {
419 ConnectionState::PollComplete => "PollComplete",
420 ConnectionState::Recover(_) => "Recover",
421 }
422 )
423 }
424}
425
426impl<C> ClusterConnInner<C>
427where
428 C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
429{
430 async fn new(
431 initial_nodes: &[ConnectionInfo],
432 cluster_params: ClusterParams,
433 ) -> RedisResult<Self> {
434 let connections = Self::create_initial_connections(initial_nodes, &cluster_params).await?;
435 let subscription_tracker = if cluster_params.async_push_sender.is_some() {
436 Some(Mutex::new(SubscriptionTracker::default()))
437 } else {
438 None
439 };
440 let inner = Arc::new(InnerCore {
441 conn_lock: RwLock::new((connections, SlotMap::new(cluster_params.read_from_replicas))),
442 cluster_params,
443 pending_requests: Mutex::new(Vec::new()),
444 initial_nodes: initial_nodes.to_vec(),
445 subscription_tracker,
446 });
447 let connection = ClusterConnInner {
448 inner,
449 in_flight_requests: Default::default(),
450 refresh_error: None,
451 state: ConnectionState::PollComplete,
452 };
453 Self::refresh_slots(connection.inner.clone()).await?;
454 Ok(connection)
455 }
456
457 async fn create_initial_connections(
458 initial_nodes: &[ConnectionInfo],
459 params: &ClusterParams,
460 ) -> RedisResult<ConnectionMap<C>> {
461 let (connections, error) = stream::iter(initial_nodes.iter().cloned())
462 .map(|info| {
463 let params = params.clone();
464 async move {
465 let addr = info.addr.to_string();
466 let result = connect_and_check(&addr, params).await;
467 match result {
468 Ok(conn) => Ok((addr, async { conn }.boxed().shared())),
469 Err(e) => {
470 debug!("Failed to connect to initial node: {:?}", e);
471 Err(e)
472 }
473 }
474 }
475 })
476 .buffer_unordered(initial_nodes.len())
477 .fold(
478 (ConnectionMap::<C>::with_capacity(initial_nodes.len()), None),
479 |(mut connections, mut error), result| async move {
480 match result {
481 Ok((addr, conn)) => {
482 connections.insert(addr, conn);
483 }
484 Err(err) => {
485 error = Some(err);
488 }
489 }
490 (connections, error)
491 },
492 )
493 .await;
494 if connections.is_empty() {
495 if let Some(err) = error {
496 return Err(RedisError::from((
497 ErrorKind::IoError,
498 "Failed to create initial connections",
499 err.to_string(),
500 )));
501 } else {
502 return Err(RedisError::from((
503 ErrorKind::IoError,
504 "Failed to create initial connections",
505 )));
506 }
507 }
508 Ok(connections)
509 }
510
511 fn resubscribe(&self) {
512 let Some(subscription_tracker) = self.inner.subscription_tracker.as_ref() else {
513 return;
514 };
515
516 let subscription_pipe = subscription_tracker
517 .lock()
518 .unwrap()
519 .get_subscription_pipeline();
520
521 let requests = subscription_pipe.cmd_iter().map(|cmd| {
523 let routing = RoutingInfo::for_routable(cmd)
524 .unwrap_or(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
525 .into();
526 PendingRequest {
527 retry: 0,
528 sender: request::ResultExpectation::Internal,
529 cmd: CmdArg::Cmd {
530 cmd: Arc::new(cmd.clone()),
531 routing,
532 },
533 }
534 });
535 self.inner.pending_requests.lock().unwrap().extend(requests);
536 }
537
538 fn reconnect_to_initial_nodes(&mut self) -> impl Future<Output = ()> {
539 debug!("Received request to reconnect to initial nodes");
540 let inner = self.inner.clone();
541 async move {
542 let connection_map =
543 match Self::create_initial_connections(&inner.initial_nodes, &inner.cluster_params)
544 .await
545 {
546 Ok(map) => map,
547 Err(err) => {
548 warn!("Can't reconnect to initial nodes: `{err}`");
549 return;
550 }
551 };
552 let mut write_lock = inner.conn_lock.write().await;
553 *write_lock = (
554 connection_map,
555 SlotMap::new(inner.cluster_params.read_from_replicas),
556 );
557 drop(write_lock);
558 if let Err(err) = Self::refresh_slots(inner.clone()).await {
559 warn!("Can't refresh slots with initial nodes: `{err}`");
560 };
561 }
562 }
563
564 fn refresh_connections(&mut self, addrs: Vec<String>) -> impl Future<Output = ()> {
565 let inner = self.inner.clone();
566 async move {
567 let mut write_guard = inner.conn_lock.write().await;
568 let mut connections = stream::iter(addrs)
569 .fold(
570 mem::take(&mut write_guard.0),
571 |mut connections, addr| async {
572 let conn = Self::get_or_create_conn(
573 &addr,
574 connections.remove(&addr),
575 &inner.cluster_params,
576 )
577 .await;
578 if let Ok(conn) = conn {
579 connections.insert(addr, async { conn }.boxed().shared());
580 }
581 connections
582 },
583 )
584 .await;
585 write_guard.0 = mem::take(&mut connections);
586 }
587 }
588
589 async fn refresh_slots(inner: Core<C>) -> RedisResult<()> {
591 let mut write_guard = inner.conn_lock.write().await;
592 let mut connections = mem::take(&mut write_guard.0);
593 let slots = &mut write_guard.1;
594 let mut result = Ok(());
595 for (addr, conn) in connections.iter_mut() {
596 let mut conn = conn.clone().await;
597 let value = match conn
598 .req_packed_command(&slot_cmd())
599 .await
600 .and_then(|value| value.extract_error())
601 {
602 Ok(value) => value,
603 Err(err) => {
604 result = Err(err);
605 continue;
606 }
607 };
608 match parse_slots(
609 value,
610 inner.cluster_params.tls,
611 addr.rsplit_once(':').unwrap().0,
612 )
613 .and_then(|v: Vec<Slot>| Self::build_slot_map(slots, v))
614 {
615 Ok(_) => {
616 result = Ok(());
617 break;
618 }
619 Err(err) => result = Err(err),
620 }
621 }
622 result?;
623
624 let mut nodes = write_guard.1.values().flatten().collect::<Vec<_>>();
625 nodes.sort_unstable();
626 nodes.dedup();
627 let nodes_len = nodes.len();
628 let addresses_and_connections_iter = nodes
629 .into_iter()
630 .map(|addr| (addr, connections.remove(addr)));
631
632 write_guard.0 = stream::iter(addresses_and_connections_iter)
633 .fold(
634 HashMap::with_capacity(nodes_len),
635 |mut connections, (addr, connection)| async {
636 let conn =
637 Self::get_or_create_conn(addr, connection, &inner.cluster_params).await;
638 if let Ok(conn) = conn {
639 connections.insert(addr.to_string(), async { conn }.boxed().shared());
640 }
641 connections
642 },
643 )
644 .await;
645
646 Ok(())
647 }
648
649 fn build_slot_map(slot_map: &mut SlotMap, slots_data: Vec<Slot>) -> RedisResult<()> {
650 slot_map.clear();
651 slot_map.fill_slots(slots_data);
652 trace!("{:?}", slot_map);
653 Ok(())
654 }
655
656 async fn aggregate_results(
657 receivers: Vec<(String, oneshot::Receiver<RedisResult<Response>>)>,
658 routing: &MultipleNodeRoutingInfo,
659 response_policy: Option<ResponsePolicy>,
660 ) -> RedisResult<Value> {
661 if receivers.is_empty() {
662 return Err((
663 ErrorKind::ClusterConnectionNotFound,
664 "No nodes found for multi-node operation",
665 )
666 .into());
667 }
668
669 let extract_result = |response| match response {
670 Response::Single(value) => value,
671 Response::Multiple(_) => unreachable!(),
672 };
673
674 let convert_result = |res: Result<RedisResult<Response>, _>| {
675 res.map_err(|_| RedisError::from((ErrorKind::ResponseError, "request wasn't handled due to internal failure"))) .and_then(|res| res.map(extract_result))
677 };
678
679 let get_receiver = |(_, receiver): (_, oneshot::Receiver<RedisResult<Response>>)| async {
680 convert_result(receiver.await)
681 };
682
683 match response_policy {
685 Some(ResponsePolicy::AllSucceeded) => {
686 future::try_join_all(receivers.into_iter().map(get_receiver))
687 .await
688 .and_then(|mut results| {
689 results.pop().ok_or(
690 (
691 ErrorKind::ClusterConnectionNotFound,
692 "No results received for multi-node operation",
693 )
694 .into(),
695 )
696 })
697 }
698 Some(ResponsePolicy::OneSucceeded) => future::select_ok(
699 receivers
700 .into_iter()
701 .map(|tuple| Box::pin(get_receiver(tuple))),
702 )
703 .await
704 .map(|(result, _)| result),
705 Some(ResponsePolicy::OneSucceededNonEmpty) => {
706 future::select_ok(receivers.into_iter().map(|(_, receiver)| {
707 Box::pin(async move {
708 let result = convert_result(receiver.await)?;
709 match result {
710 Value::Nil => Err((ErrorKind::ResponseError, "no value found").into()),
711 _ => Ok(result),
712 }
713 })
714 }))
715 .await
716 .map(|(result, _)| result)
717 }
718 Some(ResponsePolicy::Aggregate(op)) => {
719 future::try_join_all(receivers.into_iter().map(get_receiver))
720 .await
721 .and_then(|results| crate::cluster_routing::aggregate(results, op))
722 }
723 Some(ResponsePolicy::AggregateLogical(op)) => {
724 future::try_join_all(receivers.into_iter().map(get_receiver))
725 .await
726 .and_then(|results| crate::cluster_routing::logical_aggregate(results, op))
727 }
728 Some(ResponsePolicy::CombineArrays) => {
729 future::try_join_all(receivers.into_iter().map(get_receiver))
730 .await
731 .and_then(|results| match routing {
732 MultipleNodeRoutingInfo::MultiSlot(vec) => {
733 crate::cluster_routing::combine_and_sort_array_results(
734 results,
735 vec.iter().map(|(_, indices)| indices),
736 )
737 }
738 _ => crate::cluster_routing::combine_array_results(results),
739 })
740 }
741 Some(ResponsePolicy::Special) | None => {
742 future::try_join_all(receivers.into_iter().map(|(addr, receiver)| async move {
746 let result = convert_result(receiver.await)?;
747 Ok((Value::BulkString(addr.into_bytes()), result))
748 }))
749 .await
750 .map(Value::Map)
751 }
752 }
753 }
754
755 async fn execute_on_multiple_nodes<'a>(
756 cmd: &'a Arc<Cmd>,
757 routing: &'a MultipleNodeRoutingInfo,
758 core: Core<C>,
759 response_policy: Option<ResponsePolicy>,
760 ) -> OperationResult {
761 let read_guard = core.conn_lock.read().await;
762 if read_guard.0.is_empty() {
763 return OperationResult::Err((
764 OperationTarget::FanOut,
765 (
766 ErrorKind::ClusterConnectionNotFound,
767 "No connections found for multi-node operation",
768 )
769 .into(),
770 ));
771 }
772 let (receivers, requests): (Vec<_>, Vec<_>) = {
773 let to_request = |(addr, cmd): (&str, Arc<Cmd>)| {
774 read_guard.0.get(addr).cloned().map(|conn| {
775 let (sender, receiver) = oneshot::channel();
776 let addr = addr.to_string();
777 (
778 (addr.clone(), receiver),
779 PendingRequest {
780 retry: 0,
781 sender: request::ResultExpectation::External(sender),
782 cmd: CmdArg::Cmd {
783 cmd,
784 routing: InternalSingleNodeRouting::Connection {
785 identifier: addr,
786 conn,
787 }
788 .into(),
789 },
790 },
791 )
792 })
793 };
794 let slot_map = &read_guard.1;
795
796 match routing {
799 MultipleNodeRoutingInfo::AllNodes => slot_map
800 .addresses_for_all_nodes()
801 .into_iter()
802 .filter_map(|addr| to_request((addr, cmd.clone())))
803 .unzip(),
804 MultipleNodeRoutingInfo::AllMasters => slot_map
805 .addresses_for_all_primaries()
806 .into_iter()
807 .filter_map(|addr| to_request((addr, cmd.clone())))
808 .unzip(),
809 MultipleNodeRoutingInfo::MultiSlot(routes) => slot_map
810 .addresses_for_multi_slot(routes)
811 .enumerate()
812 .filter_map(|(index, addr_opt)| {
813 addr_opt.and_then(|addr| {
814 let (_, indices) = routes.get(index).unwrap();
815 let cmd =
816 Arc::new(crate::cluster_routing::command_for_multi_slot_indices(
817 cmd.as_ref(),
818 indices.iter(),
819 ));
820 to_request((addr, cmd))
821 })
822 })
823 .unzip(),
824 }
825 };
826 drop(read_guard);
827 core.pending_requests.lock().unwrap().extend(requests);
828
829 Self::aggregate_results(receivers, routing, response_policy)
830 .await
831 .map(Response::Single)
832 .map_err(|err| (OperationTarget::FanOut, err))
833 }
834
835 async fn try_cmd_request(
836 cmd: Arc<Cmd>,
837 routing: InternalRoutingInfo<C>,
838 core: Core<C>,
839 ) -> OperationResult {
840 let route = match routing {
841 InternalRoutingInfo::SingleNode(single_node_routing) => single_node_routing,
842 InternalRoutingInfo::MultiNode((multi_node_routing, response_policy)) => {
843 return Self::execute_on_multiple_nodes(
844 &cmd,
845 &multi_node_routing,
846 core,
847 response_policy,
848 )
849 .await;
850 }
851 };
852
853 match Self::get_connection(route, core).await {
854 Ok((addr, mut conn)) => conn
855 .req_packed_command(&cmd)
856 .await
857 .and_then(|value| value.extract_error())
858 .map(Response::Single)
859 .map_err(|err| (addr.into(), err)),
860 Err(err) => Err((OperationTarget::NotFound, err)),
861 }
862 }
863
864 async fn try_pipeline_request(
865 pipeline: Arc<crate::Pipeline>,
866 offset: usize,
867 count: usize,
868 conn: impl Future<Output = RedisResult<(String, C)>>,
869 ) -> OperationResult {
870 match conn.await {
871 Ok((addr, mut conn)) => conn
872 .req_packed_commands(&pipeline, offset, count)
873 .await
874 .and_then(Value::extract_error_vec)
875 .map(Response::Multiple)
876 .map_err(|err| (OperationTarget::Node { address: addr }, err)),
877 Err(err) => Err((OperationTarget::NotFound, err)),
878 }
879 }
880
881 async fn try_request(cmd: CmdArg<C>, core: Core<C>) -> OperationResult {
882 match cmd {
883 CmdArg::Cmd { cmd, routing } => Self::try_cmd_request(cmd, routing, core).await,
884 CmdArg::Pipeline {
885 pipeline,
886 offset,
887 count,
888 route,
889 } => {
890 Self::try_pipeline_request(
891 pipeline,
892 offset,
893 count,
894 Self::get_connection(route, core),
895 )
896 .await
897 }
898 }
899 }
900
901 async fn get_connection(
902 route: InternalSingleNodeRouting<C>,
903 core: Core<C>,
904 ) -> RedisResult<(String, C)> {
905 let read_guard = core.conn_lock.read().await;
906
907 let conn = match route {
908 InternalSingleNodeRouting::Random => None,
909 InternalSingleNodeRouting::SpecificNode(route) => read_guard
910 .1
911 .slot_addr_for_route(&route)
912 .map(|addr| addr.to_string()),
913 InternalSingleNodeRouting::Connection { identifier, conn } => {
914 return Ok((identifier, conn.await));
915 }
916 InternalSingleNodeRouting::Redirect { redirect, .. } => {
917 drop(read_guard);
918 return Self::get_redirected_connection(redirect, core).await;
920 }
921 InternalSingleNodeRouting::ByAddress(address) => {
922 if let Some(conn) = read_guard.0.get(&address).cloned() {
923 return Ok((address, conn.await));
924 } else {
925 return Err((
926 ErrorKind::ClientError,
927 "Requested connection not found",
928 address,
929 )
930 .into());
931 }
932 }
933 }
934 .map(|addr| {
935 let conn = read_guard.0.get(&addr).cloned();
936 (addr, conn)
937 });
938 drop(read_guard);
939
940 let addr_conn_option = match conn {
941 Some((addr, Some(conn))) => Some((addr, conn.await)),
942 Some((addr, None)) => connect_check_and_add(core.clone(), addr.clone())
943 .await
944 .ok()
945 .map(|conn| (addr, conn)),
946 None => None,
947 };
948
949 let (addr, conn) = match addr_conn_option {
950 Some(tuple) => tuple,
951 None => {
952 let read_guard = core.conn_lock.read().await;
953 if let Some((random_addr, random_conn_future)) =
954 get_random_connection(&read_guard.0)
955 {
956 drop(read_guard);
957 (random_addr, random_conn_future.await)
958 } else {
959 return Err(
960 (ErrorKind::ClusterConnectionNotFound, "No connections found").into(),
961 );
962 }
963 }
964 };
965
966 Ok((addr, conn))
967 }
968
969 async fn get_redirected_connection(
970 redirect: Redirect,
971 core: Core<C>,
972 ) -> RedisResult<(String, C)> {
973 let asking = matches!(redirect, Redirect::Ask(_));
974 let addr = match redirect {
975 Redirect::Moved(addr) => addr,
976 Redirect::Ask(addr) => addr,
977 };
978 let read_guard = core.conn_lock.read().await;
979 let conn = read_guard.0.get(&addr).cloned();
980 drop(read_guard);
981 let mut conn = match conn {
982 Some(conn) => conn.await,
983 None => connect_check_and_add(core.clone(), addr.clone()).await?,
984 };
985 if asking {
986 let _ = conn
987 .req_packed_command(&crate::cmd::cmd("ASKING"))
988 .await
989 .and_then(|value| value.extract_error());
990 }
991
992 Ok((addr, conn))
993 }
994
995 fn poll_recover(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), RedisError>> {
996 let recover_future = match &mut self.state {
997 ConnectionState::PollComplete => return Poll::Ready(Ok(())),
998 ConnectionState::Recover(future) => future,
999 };
1000 let res = match recover_future {
1001 RecoverFuture::RecoverSlots(ref mut future) => match ready!(future.as_mut().poll(cx)) {
1002 Ok(_) => {
1003 trace!("Recovered!");
1004 self.state = ConnectionState::PollComplete;
1005 Ok(())
1006 }
1007 Err(err) => {
1008 trace!("Recover slots failed!");
1009 *future = Box::pin(Self::refresh_slots(self.inner.clone()));
1010 Err(err)
1011 }
1012 },
1013 RecoverFuture::Reconnect(ref mut future) => {
1014 ready!(future.as_mut().poll(cx));
1015 trace!("Reconnected connections");
1016 self.state = ConnectionState::PollComplete;
1017 Ok(())
1018 }
1019 };
1020 if res.is_ok() {
1021 self.resubscribe();
1022 }
1023 Poll::Ready(res)
1024 }
1025
1026 fn poll_complete(&mut self, cx: &mut task::Context<'_>) -> Poll<PollFlushAction> {
1027 let mut poll_flush_action = PollFlushAction::None;
1028
1029 let mut pending_requests_guard = self.inner.pending_requests.lock().unwrap();
1030 if !pending_requests_guard.is_empty() {
1031 let mut pending_requests = mem::take(&mut *pending_requests_guard);
1032 for request in pending_requests.drain(..) {
1033 if request.sender.is_closed() {
1037 continue;
1038 }
1039
1040 let future = Self::try_request(request.cmd.clone(), self.inner.clone()).boxed();
1041 self.in_flight_requests.push(Box::pin(Request {
1042 retry_params: self.inner.cluster_params.retry_params.clone(),
1043 request: Some(request),
1044 future: RequestState::Future { future },
1045 }));
1046 }
1047 *pending_requests_guard = pending_requests;
1048 }
1049 drop(pending_requests_guard);
1050
1051 loop {
1052 let (request_handling, next) =
1053 match Pin::new(&mut self.in_flight_requests).poll_next(cx) {
1054 Poll::Ready(Some(result)) => result,
1055 Poll::Ready(None) | Poll::Pending => break,
1056 };
1057 match request_handling {
1058 Some(Retry::MoveToPending { request }) => {
1059 self.inner.pending_requests.lock().unwrap().push(request)
1060 }
1061 Some(Retry::Immediately { request }) => {
1062 let future = Self::try_request(request.cmd.clone(), self.inner.clone());
1063 self.in_flight_requests.push(Box::pin(Request {
1064 retry_params: self.inner.cluster_params.retry_params.clone(),
1065 request: Some(request),
1066 future: RequestState::Future {
1067 future: Box::pin(future),
1068 },
1069 }));
1070 }
1071 Some(Retry::AfterSleep {
1072 request,
1073 sleep_duration,
1074 }) => {
1075 let future = RequestState::Sleep {
1076 sleep: boxed_sleep(sleep_duration),
1077 };
1078 self.in_flight_requests.push(Box::pin(Request {
1079 retry_params: self.inner.cluster_params.retry_params.clone(),
1080 request: Some(request),
1081 future,
1082 }));
1083 }
1084 None => {}
1085 };
1086 poll_flush_action = poll_flush_action.change_state(next);
1087 }
1088
1089 if !matches!(poll_flush_action, PollFlushAction::None) || self.in_flight_requests.is_empty()
1090 {
1091 Poll::Ready(poll_flush_action)
1092 } else {
1093 Poll::Pending
1094 }
1095 }
1096
1097 fn send_refresh_error(&mut self) {
1098 if self.refresh_error.is_some() {
1099 if let Some(mut request) = Pin::new(&mut self.in_flight_requests)
1100 .iter_pin_mut()
1101 .find(|request| request.request.is_some())
1102 {
1103 (*request)
1104 .as_mut()
1105 .respond(Err(self.refresh_error.take().unwrap()));
1106 } else if let Some(request) = self.inner.pending_requests.lock().unwrap().pop() {
1107 request.sender.send(Err(self.refresh_error.take().unwrap()));
1108 }
1109 }
1110 }
1111
1112 async fn get_or_create_conn(
1113 addr: &str,
1114 conn_option: Option<ConnectionFuture<C>>,
1115 params: &ClusterParams,
1116 ) -> RedisResult<C> {
1117 if let Some(conn) = conn_option {
1118 let mut conn = conn.await;
1119 match check_connection(&mut conn).await {
1120 Ok(_) => Ok(conn),
1121 Err(_) => connect_and_check(addr, params.clone()).await,
1122 }
1123 } else {
1124 connect_and_check(addr, params.clone()).await
1125 }
1126 }
1127}
1128
1129#[derive(Debug, PartialEq)]
1130enum PollFlushAction {
1131 None,
1132 RebuildSlots,
1133 Reconnect(Vec<String>),
1134 ReconnectFromInitialConnections,
1135}
1136
1137impl PollFlushAction {
1138 fn change_state(self, next_state: PollFlushAction) -> PollFlushAction {
1139 match (self, next_state) {
1140 (PollFlushAction::None, next_state) => next_state,
1141 (next_state, PollFlushAction::None) => next_state,
1142 (PollFlushAction::ReconnectFromInitialConnections, _)
1143 | (_, PollFlushAction::ReconnectFromInitialConnections) => {
1144 PollFlushAction::ReconnectFromInitialConnections
1145 }
1146
1147 (PollFlushAction::RebuildSlots, _) | (_, PollFlushAction::RebuildSlots) => {
1148 PollFlushAction::RebuildSlots
1149 }
1150
1151 (PollFlushAction::Reconnect(mut addrs), PollFlushAction::Reconnect(new_addrs)) => {
1152 addrs.extend(new_addrs);
1153 Self::Reconnect(addrs)
1154 }
1155 }
1156 }
1157}
1158
1159impl<C> Sink<Message<C>> for ClusterConnInner<C>
1160where
1161 C: ConnectionLike + Connect + Clone + Send + Sync + Unpin + 'static,
1162{
1163 type Error = ();
1164
1165 fn poll_ready(self: Pin<&mut Self>, _cx: &mut task::Context) -> Poll<Result<(), Self::Error>> {
1166 Poll::Ready(Ok(()))
1167 }
1168
1169 fn start_send(self: Pin<&mut Self>, msg: Message<C>) -> Result<(), Self::Error> {
1170 trace!("start_send");
1171 let Message { cmd, sender } = msg;
1172
1173 if let Some(tracker) = &self.inner.subscription_tracker {
1174 let mut tracker = tracker.lock().unwrap();
1176 match &cmd {
1177 CmdArg::Cmd { cmd, .. } => tracker.update_with_cmd(cmd.as_ref()),
1178 CmdArg::Pipeline { pipeline, .. } => {
1179 tracker.update_with_pipeline(pipeline.as_ref())
1180 }
1181 }
1182 };
1183
1184 self.inner
1185 .pending_requests
1186 .lock()
1187 .unwrap()
1188 .push(PendingRequest {
1189 retry: 0,
1190 sender: request::ResultExpectation::External(sender),
1191 cmd,
1192 });
1193 Ok(())
1194 }
1195
1196 fn poll_flush(
1197 mut self: Pin<&mut Self>,
1198 cx: &mut task::Context,
1199 ) -> Poll<Result<(), Self::Error>> {
1200 trace!("poll_flush: {:?}", self.state);
1201 loop {
1202 self.send_refresh_error();
1203
1204 if let Err(err) = ready!(self.as_mut().poll_recover(cx)) {
1205 self.refresh_error = Some(err);
1209
1210 cx.waker().wake_by_ref();
1214 return Poll::Pending;
1215 }
1216
1217 match ready!(self.poll_complete(cx)) {
1218 PollFlushAction::None => return Poll::Ready(Ok(())),
1219 PollFlushAction::RebuildSlots => {
1220 self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
1221 Self::refresh_slots(self.inner.clone()),
1222 )));
1223 }
1224 PollFlushAction::Reconnect(addrs) => {
1225 self.state = ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
1226 self.refresh_connections(addrs),
1227 )));
1228 }
1229 PollFlushAction::ReconnectFromInitialConnections => {
1230 self.state = ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
1231 self.reconnect_to_initial_nodes(),
1232 )));
1233 }
1234 }
1235 }
1236 }
1237
1238 fn poll_close(
1239 mut self: Pin<&mut Self>,
1240 cx: &mut task::Context,
1241 ) -> Poll<Result<(), Self::Error>> {
1242 match self.poll_complete(cx) {
1244 Poll::Ready(PollFlushAction::None) => (),
1245 Poll::Ready(_) => Err(())?,
1246 Poll::Pending => (),
1247 };
1248 if self.in_flight_requests.is_empty() {
1251 return Poll::Ready(Ok(()));
1252 }
1253
1254 self.poll_flush(cx)
1255 }
1256}
1257
1258impl<C> ConnectionLike for ClusterConnection<C>
1259where
1260 C: ConnectionLike + Send + Clone + Unpin + Sync + Connect + 'static,
1261{
1262 fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
1263 let routing = RoutingInfo::for_routable(cmd)
1264 .unwrap_or(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random));
1265 self.route_command(cmd, routing).boxed()
1266 }
1267
1268 fn req_packed_commands<'a>(
1269 &'a mut self,
1270 pipeline: &'a crate::Pipeline,
1271 offset: usize,
1272 count: usize,
1273 ) -> RedisFuture<'a, Vec<Value>> {
1274 async move {
1275 let route = route_for_pipeline(pipeline)?;
1276 self.route_pipeline(pipeline, offset, count, route.into())
1277 .await
1278 }
1279 .boxed()
1280 }
1281
1282 fn get_db(&self) -> i64 {
1283 0
1284 }
1285}
1286pub trait Connect: Sized {
1289 fn connect_with_config<'a, T>(info: T, config: AsyncConnectionConfig) -> RedisFuture<'a, Self>
1291 where
1292 T: IntoConnectionInfo + Send + 'a;
1293}
1294
1295impl Connect for MultiplexedConnection {
1296 fn connect_with_config<'a, T>(info: T, config: AsyncConnectionConfig) -> RedisFuture<'a, Self>
1297 where
1298 T: IntoConnectionInfo + Send + 'a,
1299 {
1300 async move {
1301 let connection_info = info.into_connection_info()?;
1302 let client = crate::Client::open(connection_info)?;
1303 client
1304 .get_multiplexed_async_connection_with_config(&config)
1305 .await
1306 }
1307 .boxed()
1308 }
1309}
1310
1311async fn connect_check_and_add<C>(core: Core<C>, addr: String) -> RedisResult<C>
1312where
1313 C: ConnectionLike + Connect + Send + Clone + 'static,
1314{
1315 match connect_and_check::<C>(&addr, core.cluster_params.clone()).await {
1316 Ok(conn) => {
1317 let conn_clone = conn.clone();
1318 core.conn_lock
1319 .write()
1320 .await
1321 .0
1322 .insert(addr, async { conn_clone }.boxed().shared());
1323 Ok(conn)
1324 }
1325 Err(err) => Err(err),
1326 }
1327}
1328
1329async fn connect_and_check<C>(node: &str, params: ClusterParams) -> RedisResult<C>
1330where
1331 C: ConnectionLike + Connect + Send + 'static,
1332{
1333 let read_from_replicas = params.read_from_replicas;
1334 let connection_timeout = params.connection_timeout;
1335 let response_timeout = params.response_timeout;
1336 let push_sender = params.async_push_sender.clone();
1337 let tcp_settings = params.tcp_settings.clone();
1338 let info = get_connection_info(node, params)?;
1339 let mut config = AsyncConnectionConfig::default()
1340 .set_connection_timeout(connection_timeout)
1341 .set_tcp_settings(tcp_settings);
1342 if let Some(response_timeout) = response_timeout {
1343 config = config.set_response_timeout(response_timeout);
1344 };
1345 if let Some(push_sender) = push_sender {
1346 config = config.set_push_sender_internal(push_sender);
1347 }
1348 let mut conn = match C::connect_with_config(info, config).await {
1349 Ok(conn) => conn,
1350 Err(err) => {
1351 warn!("Failed to connect to node: {:?}, due to: {:?}", node, err);
1352 return Err(err);
1353 }
1354 };
1355
1356 let check = if read_from_replicas {
1357 cmd("READONLY")
1359 } else {
1360 cmd("PING")
1361 };
1362
1363 conn.req_packed_command(&check).await?;
1364 Ok(conn)
1365}
1366
1367async fn check_connection<C>(conn: &mut C) -> RedisResult<()>
1368where
1369 C: ConnectionLike + Send + 'static,
1370{
1371 let mut cmd = Cmd::new();
1372 cmd.arg("PING");
1373 cmd.query_async::<String>(conn).await?;
1374 Ok(())
1375}
1376
1377fn get_random_connection<C>(connections: &ConnectionMap<C>) -> Option<(String, ConnectionFuture<C>)>
1378where
1379 C: Clone,
1380{
1381 connections.keys().choose(&mut rng()).and_then(|addr| {
1382 connections
1383 .get(addr)
1384 .map(|conn| (addr.clone(), conn.clone()))
1385 })
1386}