1use std::{
94 collections::HashMap,
95 fmt,
96 future::Future,
97 io, mem,
98 pin::Pin,
99 sync::{Arc, Mutex},
100 task::{self, Poll},
101 time::Duration,
102};
103
104mod request;
105mod routing;
106use crate::{
107 aio::{check_resp3, ConnectionLike, HandleContainer, MultiplexedConnection, Runtime},
108 cluster::{get_connection_info, slot_cmd},
109 cluster_client::ClusterParams,
110 cluster_routing::{
111 MultipleNodeRoutingInfo, Redirect, ResponsePolicy, RoutingInfo, SingleNodeRoutingInfo,
112 Slot, SlotMap,
113 },
114 cluster_topology::parse_slots,
115 cmd,
116 subscription_tracker::SubscriptionTracker,
117 types::closed_connection_error,
118 AsyncConnectionConfig, Cmd, ConnectionInfo, ErrorKind, IntoConnectionInfo, RedisError,
119 RedisFuture, RedisResult, ToRedisArgs, Value,
120};
121
122#[cfg(feature = "cache-aio")]
123use crate::caching::{CacheManager, CacheStatistics};
124use crate::ProtocolVersion;
125use futures_sink::Sink;
126use futures_util::{
127 future::{self, BoxFuture, FutureExt},
128 ready,
129 stream::{self, Stream, StreamExt},
130};
131use log::{debug, trace, warn};
132use rand::{rng, seq::IteratorRandom};
133use request::{CmdArg, PendingRequest, Request, RequestState, Retry};
134use routing::{route_for_pipeline, InternalRoutingInfo, InternalSingleNodeRouting};
135use tokio::sync::{mpsc, oneshot, RwLock};
136
137struct ClientSideState {
138 protocol: ProtocolVersion,
139 _task_handle: HandleContainer,
140 response_timeout: Option<Duration>,
141 runtime: Runtime,
142 #[cfg(feature = "cache-aio")]
143 cache_manager: Option<CacheManager>,
144}
145
146#[derive(Clone)]
151pub struct ClusterConnection<C = MultiplexedConnection> {
152 state: Arc<ClientSideState>,
153 sender: mpsc::Sender<Message<C>>,
154}
155
156impl<C> ClusterConnection<C>
157where
158 C: ConnectionLike + Connect + Clone + Send + Sync + Unpin + 'static,
159{
160 pub(crate) async fn new(
161 initial_nodes: &[ConnectionInfo],
162 cluster_params: ClusterParams,
163 ) -> RedisResult<ClusterConnection<C>> {
164 let protocol = cluster_params.protocol.unwrap_or_default();
165 let response_timeout = cluster_params.response_timeout;
166 #[cfg(feature = "cache-aio")]
167 let cache_manager = cluster_params.cache_manager.clone();
168 let runtime = Runtime::locate();
169 ClusterConnInner::new(initial_nodes, cluster_params)
170 .await
171 .map(|inner| {
172 let (sender, mut receiver) = mpsc::channel::<Message<_>>(100);
173 let stream = async move {
174 let _ = stream::poll_fn(move |cx| receiver.poll_recv(cx))
175 .map(Ok)
176 .forward(inner)
177 .await;
178 };
179 let _task_handle = HandleContainer::new(runtime.spawn(stream));
180
181 ClusterConnection {
182 sender,
183 state: Arc::new(ClientSideState {
184 protocol,
185 _task_handle,
186 response_timeout,
187 runtime,
188 #[cfg(feature = "cache-aio")]
189 cache_manager,
190 }),
191 }
192 })
193 }
194
195 pub async fn route_command(&mut self, cmd: &Cmd, routing: RoutingInfo) -> RedisResult<Value> {
197 trace!("send_packed_command");
198 let (sender, receiver) = oneshot::channel();
199 let request = async {
200 self.sender
201 .send(Message {
202 cmd: CmdArg::Cmd {
203 cmd: Arc::new(cmd.clone()), routing: routing.into(),
205 },
206 sender,
207 })
208 .await
209 .map_err(|_| {
210 RedisError::from(io::Error::new(
211 io::ErrorKind::BrokenPipe,
212 "redis_cluster: Unable to send command",
213 ))
214 })?;
215
216 receiver
217 .await
218 .unwrap_or_else(|_| {
219 Err(RedisError::from(io::Error::new(
220 io::ErrorKind::BrokenPipe,
221 "redis_cluster: Unable to receive command",
222 )))
223 })
224 .map(|response| match response {
225 Response::Single(value) => value,
226 Response::Multiple(_) => unreachable!(),
227 })
228 };
229
230 match self.state.response_timeout {
231 Some(duration) => self.state.runtime.timeout(duration, request).await?,
232 None => request.await,
233 }
234 }
235
236 pub async fn route_pipeline<'a>(
238 &'a mut self,
239 pipeline: &'a crate::Pipeline,
240 offset: usize,
241 count: usize,
242 route: SingleNodeRoutingInfo,
243 ) -> RedisResult<Vec<Value>> {
244 let (sender, receiver) = oneshot::channel();
245
246 let request = async {
247 self.sender
248 .send(Message {
249 cmd: CmdArg::Pipeline {
250 pipeline: Arc::new(pipeline.clone()), offset,
252 count,
253 route: route.into(),
254 },
255 sender,
256 })
257 .await
258 .map_err(|_| closed_connection_error())?;
259 receiver
260 .await
261 .unwrap_or_else(|_| Err(closed_connection_error()))
262 .map(|response| match response {
263 Response::Multiple(values) => values,
264 Response::Single(_) => unreachable!(),
265 })
266 };
267
268 match self.state.response_timeout {
269 Some(duration) => self.state.runtime.timeout(duration, request).await?,
270 None => request.await,
271 }
272 }
273
274 pub async fn subscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> {
283 check_resp3!(self.state.protocol);
284 let mut cmd = cmd("SUBSCRIBE");
285 cmd.arg(channel_name);
286 cmd.exec_async(self).await?;
287 Ok(())
288 }
289
290 pub async fn unsubscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> {
294 check_resp3!(self.state.protocol);
295 let mut cmd = cmd("UNSUBSCRIBE");
296 cmd.arg(channel_name);
297 cmd.exec_async(self).await?;
298 Ok(())
299 }
300
301 pub async fn psubscribe(&mut self, channel_pattern: impl ToRedisArgs) -> RedisResult<()> {
310 check_resp3!(self.state.protocol);
311 let mut cmd = cmd("PSUBSCRIBE");
312 cmd.arg(channel_pattern);
313 cmd.exec_async(self).await?;
314 Ok(())
315 }
316
317 pub async fn punsubscribe(&mut self, channel_pattern: impl ToRedisArgs) -> RedisResult<()> {
321 check_resp3!(self.state.protocol);
322 let mut cmd = cmd("PUNSUBSCRIBE");
323 cmd.arg(channel_pattern);
324 cmd.exec_async(self).await?;
325 Ok(())
326 }
327
328 pub async fn ssubscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> {
337 check_resp3!(self.state.protocol);
338 let mut cmd = cmd("SSUBSCRIBE");
339 cmd.arg(channel_name);
340 cmd.exec_async(self).await?;
341 Ok(())
342 }
343
344 pub async fn sunsubscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> {
348 check_resp3!(self.state.protocol);
349 let mut cmd = cmd("SUNSUBSCRIBE");
350 cmd.arg(channel_name);
351 cmd.exec_async(self).await?;
352 Ok(())
353 }
354 #[cfg(feature = "cache-aio")]
356 #[cfg_attr(docsrs, doc(cfg(feature = "cache-aio")))]
357 pub fn get_cache_statistics(&self) -> Option<CacheStatistics> {
358 self.state.cache_manager.as_ref().map(|cm| cm.statistics())
359 }
360}
361
362type ConnectionMap<C> = HashMap<String, C>;
363
364struct InnerCore<C> {
368 conn_lock: RwLock<(ConnectionMap<C>, SlotMap)>,
369 cluster_params: ClusterParams,
370 pending_requests: Mutex<Vec<PendingRequest<C>>>,
371 initial_nodes: Vec<ConnectionInfo>,
372 subscription_tracker: Option<Mutex<SubscriptionTracker>>,
373}
374
375type Core<C> = Arc<InnerCore<C>>;
376
377struct ClusterConnInner<C> {
381 inner: Core<C>,
382 state: ConnectionState,
383 #[allow(clippy::complexity)]
384 in_flight_requests: stream::FuturesUnordered<Pin<Box<Request<C>>>>,
385 refresh_error: Option<RedisError>,
386}
387
388fn boxed_sleep(duration: Duration) -> BoxFuture<'static, ()> {
389 Box::pin(Runtime::locate_and_sleep(duration))
390}
391
392#[derive(Debug, PartialEq)]
393pub(crate) enum Response {
394 Single(Value),
395 Multiple(Vec<Value>),
396}
397
398enum OperationTarget {
399 Node { address: String },
400 NotFound,
401 FanOut,
402}
403type OperationResult = Result<Response, (OperationTarget, RedisError)>;
404
405impl From<String> for OperationTarget {
406 fn from(address: String) -> Self {
407 OperationTarget::Node { address }
408 }
409}
410
411struct Message<C> {
412 cmd: CmdArg<C>,
413 sender: oneshot::Sender<RedisResult<Response>>,
414}
415
416enum RecoverFuture {
417 RecoverSlots(BoxFuture<'static, RedisResult<()>>),
418 Reconnect(BoxFuture<'static, ()>),
419}
420
421enum ConnectionState {
422 PollComplete,
423 Recover(RecoverFuture),
424}
425
426impl fmt::Debug for ConnectionState {
427 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
428 write!(
429 f,
430 "{}",
431 match self {
432 ConnectionState::PollComplete => "PollComplete",
433 ConnectionState::Recover(_) => "Recover",
434 }
435 )
436 }
437}
438
439impl<C> ClusterConnInner<C>
440where
441 C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
442{
443 async fn new(
444 initial_nodes: &[ConnectionInfo],
445 cluster_params: ClusterParams,
446 ) -> RedisResult<Self> {
447 let connections = Self::create_initial_connections(initial_nodes, &cluster_params).await?;
448 let subscription_tracker = if cluster_params.async_push_sender.is_some() {
449 Some(Mutex::new(SubscriptionTracker::default()))
450 } else {
451 None
452 };
453 let inner = Arc::new(InnerCore {
454 conn_lock: RwLock::new((connections, SlotMap::new(cluster_params.read_from_replicas))),
455 cluster_params,
456 pending_requests: Mutex::new(Vec::new()),
457 initial_nodes: initial_nodes.to_vec(),
458 subscription_tracker,
459 });
460 let connection = ClusterConnInner {
461 inner,
462 in_flight_requests: Default::default(),
463 refresh_error: None,
464 state: ConnectionState::PollComplete,
465 };
466 Self::refresh_slots(connection.inner.clone()).await?;
467 Ok(connection)
468 }
469
470 async fn create_initial_connections(
471 initial_nodes: &[ConnectionInfo],
472 params: &ClusterParams,
473 ) -> RedisResult<ConnectionMap<C>> {
474 let (connections, error) = stream::iter(initial_nodes.iter().cloned())
475 .map(|info| {
476 let params = params.clone();
477 async move {
478 let addr = info.addr.to_string();
479 let result = connect_and_check(&addr, params).await;
480 match result {
481 Ok(conn) => Ok((addr, conn)),
482 Err(e) => {
483 debug!("Failed to connect to initial node: {e:?}");
484 Err(e)
485 }
486 }
487 }
488 })
489 .buffer_unordered(initial_nodes.len())
490 .fold(
491 (ConnectionMap::<C>::with_capacity(initial_nodes.len()), None),
492 |(mut connections, mut error), result| async move {
493 match result {
494 Ok((addr, conn)) => {
495 connections.insert(addr, conn);
496 }
497 Err(err) => {
498 error = Some(err);
501 }
502 }
503 (connections, error)
504 },
505 )
506 .await;
507 if connections.is_empty() {
508 if let Some(err) = error {
509 return Err(RedisError::from((
510 ErrorKind::IoError,
511 "Failed to create initial connections",
512 err.to_string(),
513 )));
514 } else {
515 return Err(RedisError::from((
516 ErrorKind::IoError,
517 "Failed to create initial connections",
518 )));
519 }
520 }
521 Ok(connections)
522 }
523
524 fn resubscribe(&self) {
525 let Some(subscription_tracker) = self.inner.subscription_tracker.as_ref() else {
526 return;
527 };
528
529 let subscription_pipe = subscription_tracker
530 .lock()
531 .unwrap()
532 .get_subscription_pipeline();
533
534 let requests = subscription_pipe.cmd_iter().map(|cmd| {
536 let routing = RoutingInfo::for_routable(cmd)
537 .unwrap_or(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
538 .into();
539 PendingRequest {
540 retry: 0,
541 sender: request::ResultExpectation::Internal,
542 cmd: CmdArg::Cmd {
543 cmd: Arc::new(cmd.clone()),
544 routing,
545 },
546 }
547 });
548 self.inner.pending_requests.lock().unwrap().extend(requests);
549 }
550
551 fn reconnect_to_initial_nodes(&mut self) -> impl Future<Output = ()> {
552 debug!("Received request to reconnect to initial nodes");
553 let inner = self.inner.clone();
554 async move {
555 let connection_map =
556 match Self::create_initial_connections(&inner.initial_nodes, &inner.cluster_params)
557 .await
558 {
559 Ok(map) => map,
560 Err(err) => {
561 warn!("Can't reconnect to initial nodes: `{err}`");
562 return;
563 }
564 };
565 let mut write_lock = inner.conn_lock.write().await;
566 *write_lock = (
567 connection_map,
568 SlotMap::new(inner.cluster_params.read_from_replicas),
569 );
570 drop(write_lock);
571 if let Err(err) = Self::refresh_slots(inner.clone()).await {
572 warn!("Can't refresh slots with initial nodes: `{err}`");
573 };
574 }
575 }
576
577 fn refresh_connections(&mut self, addrs: Vec<String>) -> impl Future<Output = ()> {
578 let inner = self.inner.clone();
579 async move {
580 let mut write_guard = inner.conn_lock.write().await;
581
582 Self::refresh_connections_locked(&inner, &mut write_guard.0, addrs).await;
583 }
584 }
585
586 async fn refresh_slots(inner: Core<C>) -> RedisResult<()> {
588 let mut write_guard = inner.conn_lock.write().await;
589 let (connections, slots) = &mut *write_guard;
590
591 let mut result = Ok(());
592 for (addr, conn) in &mut *connections {
593 result = async {
594 let value = conn
595 .req_packed_command(&slot_cmd())
596 .await
597 .and_then(|value| value.extract_error())?;
598 let v: Vec<Slot> = parse_slots(
599 value,
600 inner.cluster_params.tls,
601 addr.rsplit_once(':').unwrap().0,
602 )?;
603 Self::build_slot_map(slots, v)
604 }
605 .await;
606 if result.is_ok() {
607 break;
608 }
609 }
610 result?;
611
612 let mut nodes = slots.values().flatten().cloned().collect::<Vec<_>>();
613 nodes.sort_unstable();
614 nodes.dedup();
615 Self::refresh_connections_locked(&inner, connections, nodes).await;
616
617 Ok(())
618 }
619
620 async fn refresh_connections_locked(
621 inner: &Core<C>,
622 connections: &mut ConnectionMap<C>,
623 nodes: Vec<String>,
624 ) {
625 let nodes_len = nodes.len();
626
627 let addresses_and_connections_iter = nodes.into_iter().map(|addr| {
628 let value = connections.remove(&addr);
629 (addr, value)
630 });
631
632 let inner = &inner;
633 *connections = stream::iter(addresses_and_connections_iter)
634 .map(|(addr, connection)| async move {
635 (
636 addr.clone(),
637 Self::get_or_create_conn(&addr, connection, &inner.cluster_params).await,
638 )
639 })
640 .buffer_unordered(nodes_len.max(8))
641 .fold(
642 HashMap::with_capacity(nodes_len),
643 |mut connections, (addr, result)| async move {
644 if let Ok(conn) = result {
645 connections.insert(addr, conn);
646 }
647 connections
648 },
649 )
650 .await;
651 }
652
653 fn build_slot_map(slot_map: &mut SlotMap, slots_data: Vec<Slot>) -> RedisResult<()> {
654 slot_map.clear();
655 slot_map.fill_slots(slots_data);
656 trace!("{slot_map:?}");
657 Ok(())
658 }
659
660 async fn aggregate_results(
661 receivers: Vec<(String, oneshot::Receiver<RedisResult<Response>>)>,
662 routing: &MultipleNodeRoutingInfo,
663 response_policy: Option<ResponsePolicy>,
664 ) -> RedisResult<Value> {
665 if receivers.is_empty() {
666 return Err((
667 ErrorKind::ClusterConnectionNotFound,
668 "No nodes found for multi-node operation",
669 )
670 .into());
671 }
672
673 let extract_result = |response| match response {
674 Response::Single(value) => value,
675 Response::Multiple(_) => unreachable!(),
676 };
677
678 let convert_result = |res: Result<RedisResult<Response>, _>| {
679 res.map_err(|_| RedisError::from((ErrorKind::ResponseError, "request wasn't handled due to internal failure"))) .and_then(|res| res.map(extract_result))
681 };
682
683 let get_receiver = |(_, receiver): (_, oneshot::Receiver<RedisResult<Response>>)| async {
684 convert_result(receiver.await)
685 };
686
687 match response_policy {
689 Some(ResponsePolicy::AllSucceeded) => {
690 future::try_join_all(receivers.into_iter().map(get_receiver))
691 .await
692 .and_then(|mut results| {
693 results.pop().ok_or(
694 (
695 ErrorKind::ClusterConnectionNotFound,
696 "No results received for multi-node operation",
697 )
698 .into(),
699 )
700 })
701 }
702 Some(ResponsePolicy::OneSucceeded) => future::select_ok(
703 receivers
704 .into_iter()
705 .map(|tuple| Box::pin(get_receiver(tuple))),
706 )
707 .await
708 .map(|(result, _)| result),
709 Some(ResponsePolicy::OneSucceededNonEmpty) => {
710 future::select_ok(receivers.into_iter().map(|(_, receiver)| {
711 Box::pin(async move {
712 let result = convert_result(receiver.await)?;
713 match result {
714 Value::Nil => Err((ErrorKind::ResponseError, "no value found").into()),
715 _ => Ok(result),
716 }
717 })
718 }))
719 .await
720 .map(|(result, _)| result)
721 }
722 Some(ResponsePolicy::Aggregate(op)) => {
723 future::try_join_all(receivers.into_iter().map(get_receiver))
724 .await
725 .and_then(|results| crate::cluster_routing::aggregate(results, op))
726 }
727 Some(ResponsePolicy::AggregateLogical(op)) => {
728 future::try_join_all(receivers.into_iter().map(get_receiver))
729 .await
730 .and_then(|results| crate::cluster_routing::logical_aggregate(results, op))
731 }
732 Some(ResponsePolicy::CombineArrays) => {
733 future::try_join_all(receivers.into_iter().map(get_receiver))
734 .await
735 .and_then(|results| match routing {
736 MultipleNodeRoutingInfo::MultiSlot(vec) => {
737 crate::cluster_routing::combine_and_sort_array_results(
738 results,
739 vec.iter().map(|(_, indices)| indices),
740 )
741 }
742 _ => crate::cluster_routing::combine_array_results(results),
743 })
744 }
745 Some(ResponsePolicy::Special) | None => {
746 future::try_join_all(receivers.into_iter().map(|(addr, receiver)| async move {
750 let result = convert_result(receiver.await)?;
751 Ok((Value::BulkString(addr.into_bytes()), result))
752 }))
753 .await
754 .map(Value::Map)
755 }
756 }
757 }
758
759 async fn execute_on_multiple_nodes<'a>(
760 cmd: &'a Arc<Cmd>,
761 routing: &'a MultipleNodeRoutingInfo,
762 core: Core<C>,
763 response_policy: Option<ResponsePolicy>,
764 ) -> OperationResult {
765 let read_guard = core.conn_lock.read().await;
766 if read_guard.0.is_empty() {
767 return OperationResult::Err((
768 OperationTarget::FanOut,
769 (
770 ErrorKind::ClusterConnectionNotFound,
771 "No connections found for multi-node operation",
772 )
773 .into(),
774 ));
775 }
776 let (receivers, requests): (Vec<_>, Vec<_>) = {
777 let to_request = |(addr, cmd): (&str, Arc<Cmd>)| {
778 read_guard.0.get(addr).cloned().map(|conn| {
779 let (sender, receiver) = oneshot::channel();
780 let addr = addr.to_string();
781 (
782 (addr.clone(), receiver),
783 PendingRequest {
784 retry: 0,
785 sender: request::ResultExpectation::External(sender),
786 cmd: CmdArg::Cmd {
787 cmd,
788 routing: InternalSingleNodeRouting::Connection {
789 identifier: addr,
790 conn,
791 }
792 .into(),
793 },
794 },
795 )
796 })
797 };
798 let slot_map = &read_guard.1;
799
800 match routing {
803 MultipleNodeRoutingInfo::AllNodes => slot_map
804 .addresses_for_all_nodes()
805 .into_iter()
806 .filter_map(|addr| to_request((addr, cmd.clone())))
807 .unzip(),
808 MultipleNodeRoutingInfo::AllMasters => slot_map
809 .addresses_for_all_primaries()
810 .into_iter()
811 .filter_map(|addr| to_request((addr, cmd.clone())))
812 .unzip(),
813 MultipleNodeRoutingInfo::MultiSlot(routes) => slot_map
814 .addresses_for_multi_slot(routes)
815 .enumerate()
816 .filter_map(|(index, addr_opt)| {
817 addr_opt.and_then(|addr| {
818 let (_, indices) = routes.get(index).unwrap();
819 let cmd =
820 Arc::new(crate::cluster_routing::command_for_multi_slot_indices(
821 cmd.as_ref(),
822 indices.iter(),
823 ));
824 to_request((addr, cmd))
825 })
826 })
827 .unzip(),
828 }
829 };
830 drop(read_guard);
831 core.pending_requests.lock().unwrap().extend(requests);
832
833 Self::aggregate_results(receivers, routing, response_policy)
834 .await
835 .map(Response::Single)
836 .map_err(|err| (OperationTarget::FanOut, err))
837 }
838
839 async fn try_cmd_request(
840 cmd: Arc<Cmd>,
841 routing: InternalRoutingInfo<C>,
842 core: Core<C>,
843 ) -> OperationResult {
844 let route = match routing {
845 InternalRoutingInfo::SingleNode(single_node_routing) => single_node_routing,
846 InternalRoutingInfo::MultiNode((multi_node_routing, response_policy)) => {
847 return Self::execute_on_multiple_nodes(
848 &cmd,
849 &multi_node_routing,
850 core,
851 response_policy,
852 )
853 .await;
854 }
855 };
856
857 match Self::get_connection(route, core).await {
858 Ok((addr, mut conn)) => conn
859 .req_packed_command(&cmd)
860 .await
861 .and_then(|value| value.extract_error())
862 .map(Response::Single)
863 .map_err(|err| (addr.into(), err)),
864 Err(err) => Err((OperationTarget::NotFound, err)),
865 }
866 }
867
868 async fn try_pipeline_request(
869 pipeline: Arc<crate::Pipeline>,
870 offset: usize,
871 count: usize,
872 conn: impl Future<Output = RedisResult<(String, C)>>,
873 ) -> OperationResult {
874 match conn.await {
875 Ok((addr, mut conn)) => conn
876 .req_packed_commands(&pipeline, offset, count)
877 .await
878 .and_then(Value::extract_error_vec)
879 .map(Response::Multiple)
880 .map_err(|err| (OperationTarget::Node { address: addr }, err)),
881 Err(err) => Err((OperationTarget::NotFound, err)),
882 }
883 }
884
885 async fn try_request(cmd: CmdArg<C>, core: Core<C>) -> OperationResult {
886 match cmd {
887 CmdArg::Cmd { cmd, routing } => Self::try_cmd_request(cmd, routing, core).await,
888 CmdArg::Pipeline {
889 pipeline,
890 offset,
891 count,
892 route,
893 } => {
894 Self::try_pipeline_request(
895 pipeline,
896 offset,
897 count,
898 Self::get_connection(route, core),
899 )
900 .await
901 }
902 }
903 }
904
905 async fn get_connection(
906 route: InternalSingleNodeRouting<C>,
907 core: Core<C>,
908 ) -> RedisResult<(String, C)> {
909 let read_guard = core.conn_lock.read().await;
910
911 let conn = match route {
912 InternalSingleNodeRouting::Random => None,
913 InternalSingleNodeRouting::SpecificNode(route) => read_guard
914 .1
915 .slot_addr_for_route(&route)
916 .map(|addr| addr.to_string()),
917 InternalSingleNodeRouting::Connection { identifier, conn } => {
918 return Ok((identifier, conn));
919 }
920 InternalSingleNodeRouting::Redirect { redirect, .. } => {
921 drop(read_guard);
922 return Self::get_redirected_connection(redirect, core).await;
924 }
925 InternalSingleNodeRouting::ByAddress(address) => {
926 if let Some(conn) = read_guard.0.get(&address).cloned() {
927 return Ok((address, conn));
928 } else {
929 return Err((
930 ErrorKind::ClientError,
931 "Requested connection not found",
932 address,
933 )
934 .into());
935 }
936 }
937 }
938 .map(|addr| {
939 let conn = read_guard.0.get(&addr).cloned();
940 (addr, conn)
941 });
942 drop(read_guard);
943
944 let addr_conn_option = match conn {
945 Some((addr, Some(conn))) => Some((addr, conn)),
946 Some((addr, None)) => connect_check_and_add(core.clone(), addr.clone())
947 .await
948 .ok()
949 .map(|conn| (addr, conn)),
950 None => None,
951 };
952
953 let (addr, conn) = match addr_conn_option {
954 Some(tuple) => tuple,
955 None => {
956 let read_guard = core.conn_lock.read().await;
957 if let Some((random_addr, random_conn)) = get_random_connection(&read_guard.0) {
958 drop(read_guard);
959 (random_addr, random_conn)
960 } else {
961 return Err(
962 (ErrorKind::ClusterConnectionNotFound, "No connections found").into(),
963 );
964 }
965 }
966 };
967
968 Ok((addr, conn))
969 }
970
971 async fn get_redirected_connection(
972 redirect: Redirect,
973 core: Core<C>,
974 ) -> RedisResult<(String, C)> {
975 let asking = matches!(redirect, Redirect::Ask(_));
976 let addr = match redirect {
977 Redirect::Moved(addr) => addr,
978 Redirect::Ask(addr) => addr,
979 };
980 let read_guard = core.conn_lock.read().await;
981 let conn = read_guard.0.get(&addr).cloned();
982 drop(read_guard);
983 let mut conn = match conn {
984 Some(conn) => conn,
985 None => connect_check_and_add(core.clone(), addr.clone()).await?,
986 };
987 if asking {
988 let _ = conn
989 .req_packed_command(&crate::cmd::cmd("ASKING"))
990 .await
991 .and_then(|value| value.extract_error());
992 }
993
994 Ok((addr, conn))
995 }
996
997 fn poll_recover(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), RedisError>> {
998 let recover_future = match &mut self.state {
999 ConnectionState::PollComplete => return Poll::Ready(Ok(())),
1000 ConnectionState::Recover(future) => future,
1001 };
1002 let res = match recover_future {
1003 RecoverFuture::RecoverSlots(ref mut future) => match ready!(future.as_mut().poll(cx)) {
1004 Ok(_) => {
1005 trace!("Recovered!");
1006 self.state = ConnectionState::PollComplete;
1007 Ok(())
1008 }
1009 Err(err) => {
1010 trace!("Recover slots failed!");
1011 *future = Box::pin(Self::refresh_slots(self.inner.clone()));
1012 Err(err)
1013 }
1014 },
1015 RecoverFuture::Reconnect(ref mut future) => {
1016 ready!(future.as_mut().poll(cx));
1017 trace!("Reconnected connections");
1018 self.state = ConnectionState::PollComplete;
1019 Ok(())
1020 }
1021 };
1022 if res.is_ok() {
1023 self.resubscribe();
1024 }
1025 Poll::Ready(res)
1026 }
1027
1028 fn poll_complete(&mut self, cx: &mut task::Context<'_>) -> Poll<PollFlushAction> {
1029 let mut poll_flush_action = PollFlushAction::None;
1030
1031 let mut pending_requests_guard = self.inner.pending_requests.lock().unwrap();
1032 if !pending_requests_guard.is_empty() {
1033 let mut pending_requests = mem::take(&mut *pending_requests_guard);
1034 for request in pending_requests.drain(..) {
1035 if request.sender.is_closed() {
1039 continue;
1040 }
1041
1042 let future = Self::try_request(request.cmd.clone(), self.inner.clone()).boxed();
1043 self.in_flight_requests.push(Box::pin(Request {
1044 retry_params: self.inner.cluster_params.retry_params.clone(),
1045 request: Some(request),
1046 future: RequestState::Future { future },
1047 }));
1048 }
1049 *pending_requests_guard = pending_requests;
1050 }
1051 drop(pending_requests_guard);
1052
1053 loop {
1054 let (request_handling, next) =
1055 match Pin::new(&mut self.in_flight_requests).poll_next(cx) {
1056 Poll::Ready(Some(result)) => result,
1057 Poll::Ready(None) | Poll::Pending => break,
1058 };
1059 match request_handling {
1060 Some(Retry::MoveToPending { request }) => {
1061 self.inner.pending_requests.lock().unwrap().push(request);
1062 }
1063 Some(Retry::Immediately { request }) => {
1064 let future = Self::try_request(request.cmd.clone(), self.inner.clone());
1065 self.in_flight_requests.push(Box::pin(Request {
1066 retry_params: self.inner.cluster_params.retry_params.clone(),
1067 request: Some(request),
1068 future: RequestState::Future {
1069 future: Box::pin(future),
1070 },
1071 }));
1072 }
1073 Some(Retry::AfterSleep {
1074 request,
1075 sleep_duration,
1076 }) => {
1077 let future = RequestState::Sleep {
1078 sleep: boxed_sleep(sleep_duration),
1079 };
1080 self.in_flight_requests.push(Box::pin(Request {
1081 retry_params: self.inner.cluster_params.retry_params.clone(),
1082 request: Some(request),
1083 future,
1084 }));
1085 }
1086 None => {}
1087 };
1088 poll_flush_action = poll_flush_action.change_state(next);
1089 }
1090
1091 if !matches!(poll_flush_action, PollFlushAction::None) || self.in_flight_requests.is_empty()
1092 {
1093 Poll::Ready(poll_flush_action)
1094 } else {
1095 Poll::Pending
1096 }
1097 }
1098
1099 fn send_refresh_error(&mut self) {
1100 if self.refresh_error.is_some() {
1101 if let Some(mut request) = Pin::new(&mut self.in_flight_requests)
1102 .iter_pin_mut()
1103 .find(|request| request.request.is_some())
1104 {
1105 (*request)
1106 .as_mut()
1107 .respond(Err(self.refresh_error.take().unwrap()));
1108 } else {
1109 let maybe_request = self.inner.pending_requests.lock().unwrap().pop();
1111 if let Some(request) = maybe_request {
1112 request.sender.send(Err(self.refresh_error.take().unwrap()));
1113 }
1114 }
1115 }
1116 }
1117
1118 async fn get_or_create_conn(
1119 addr: &str,
1120 conn_option: Option<C>,
1121 params: &ClusterParams,
1122 ) -> RedisResult<C> {
1123 if let Some(mut conn) = conn_option {
1124 match check_connection(&mut conn).await {
1125 Ok(_) => Ok(conn),
1126 Err(_) => connect_and_check(addr, params.clone()).await,
1127 }
1128 } else {
1129 connect_and_check(addr, params.clone()).await
1130 }
1131 }
1132}
1133
1134#[derive(Debug, PartialEq)]
1135enum PollFlushAction {
1136 None,
1137 RebuildSlots,
1138 Reconnect(Vec<String>),
1139 ReconnectFromInitialConnections,
1140}
1141
1142impl PollFlushAction {
1143 fn change_state(self, next_state: PollFlushAction) -> PollFlushAction {
1144 match (self, next_state) {
1145 (PollFlushAction::None, next_state) => next_state,
1146 (next_state, PollFlushAction::None) => next_state,
1147 (PollFlushAction::ReconnectFromInitialConnections, _)
1148 | (_, PollFlushAction::ReconnectFromInitialConnections) => {
1149 PollFlushAction::ReconnectFromInitialConnections
1150 }
1151
1152 (PollFlushAction::RebuildSlots, _) | (_, PollFlushAction::RebuildSlots) => {
1153 PollFlushAction::RebuildSlots
1154 }
1155
1156 (PollFlushAction::Reconnect(mut addrs), PollFlushAction::Reconnect(new_addrs)) => {
1157 addrs.extend(new_addrs);
1158 Self::Reconnect(addrs)
1159 }
1160 }
1161 }
1162}
1163
1164impl<C> Sink<Message<C>> for ClusterConnInner<C>
1165where
1166 C: ConnectionLike + Connect + Clone + Send + Sync + Unpin + 'static,
1167{
1168 type Error = ();
1169
1170 fn poll_ready(self: Pin<&mut Self>, _cx: &mut task::Context) -> Poll<Result<(), Self::Error>> {
1171 Poll::Ready(Ok(()))
1172 }
1173
1174 fn start_send(self: Pin<&mut Self>, msg: Message<C>) -> Result<(), Self::Error> {
1175 trace!("start_send");
1176 let Message { cmd, sender } = msg;
1177
1178 if let Some(tracker) = &self.inner.subscription_tracker {
1179 let mut tracker = tracker.lock().unwrap();
1181 match &cmd {
1182 CmdArg::Cmd { cmd, .. } => tracker.update_with_cmd(cmd.as_ref()),
1183 CmdArg::Pipeline { pipeline, .. } => {
1184 tracker.update_with_pipeline(pipeline.as_ref())
1185 }
1186 }
1187 };
1188
1189 self.inner
1190 .pending_requests
1191 .lock()
1192 .unwrap()
1193 .push(PendingRequest {
1194 retry: 0,
1195 sender: request::ResultExpectation::External(sender),
1196 cmd,
1197 });
1198 Ok(())
1199 }
1200
1201 fn poll_flush(
1202 mut self: Pin<&mut Self>,
1203 cx: &mut task::Context,
1204 ) -> Poll<Result<(), Self::Error>> {
1205 trace!("poll_flush: {:?}", self.state);
1206 loop {
1207 self.send_refresh_error();
1208
1209 if let Err(err) = ready!(self.as_mut().poll_recover(cx)) {
1210 self.refresh_error = Some(err);
1214
1215 cx.waker().wake_by_ref();
1219 return Poll::Pending;
1220 }
1221
1222 match ready!(self.poll_complete(cx)) {
1223 PollFlushAction::None => return Poll::Ready(Ok(())),
1224 PollFlushAction::RebuildSlots => {
1225 self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
1226 Self::refresh_slots(self.inner.clone()),
1227 )));
1228 }
1229 PollFlushAction::Reconnect(addrs) => {
1230 self.state = ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
1231 self.refresh_connections(addrs),
1232 )));
1233 }
1234 PollFlushAction::ReconnectFromInitialConnections => {
1235 self.state = ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
1236 self.reconnect_to_initial_nodes(),
1237 )));
1238 }
1239 }
1240 }
1241 }
1242
1243 fn poll_close(
1244 mut self: Pin<&mut Self>,
1245 cx: &mut task::Context,
1246 ) -> Poll<Result<(), Self::Error>> {
1247 match self.poll_complete(cx) {
1249 Poll::Ready(PollFlushAction::None) => (),
1250 Poll::Ready(_) => Err(())?,
1251 Poll::Pending => (),
1252 };
1253 if self.in_flight_requests.is_empty() {
1256 return Poll::Ready(Ok(()));
1257 }
1258
1259 self.poll_flush(cx)
1260 }
1261}
1262
1263impl<C> ConnectionLike for ClusterConnection<C>
1264where
1265 C: ConnectionLike + Send + Clone + Unpin + Sync + Connect + 'static,
1266{
1267 fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
1268 let routing = RoutingInfo::for_routable(cmd)
1269 .unwrap_or(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random));
1270 self.route_command(cmd, routing).boxed()
1271 }
1272
1273 fn req_packed_commands<'a>(
1274 &'a mut self,
1275 pipeline: &'a crate::Pipeline,
1276 offset: usize,
1277 count: usize,
1278 ) -> RedisFuture<'a, Vec<Value>> {
1279 async move {
1280 let route = route_for_pipeline(pipeline)?;
1281 self.route_pipeline(pipeline, offset, count, route.into())
1282 .await
1283 }
1284 .boxed()
1285 }
1286
1287 fn get_db(&self) -> i64 {
1288 0
1289 }
1290}
1291pub trait Connect: Sized {
1294 fn connect_with_config<'a, T>(info: T, config: AsyncConnectionConfig) -> RedisFuture<'a, Self>
1296 where
1297 T: IntoConnectionInfo + Send + 'a;
1298}
1299
1300impl Connect for MultiplexedConnection {
1301 fn connect_with_config<'a, T>(info: T, config: AsyncConnectionConfig) -> RedisFuture<'a, Self>
1302 where
1303 T: IntoConnectionInfo + Send + 'a,
1304 {
1305 async move {
1306 let connection_info = info.into_connection_info()?;
1307 let client = crate::Client::open(connection_info)?;
1308 client
1309 .get_multiplexed_async_connection_with_config(&config)
1310 .await
1311 }
1312 .boxed()
1313 }
1314}
1315
1316async fn connect_check_and_add<C>(core: Core<C>, addr: String) -> RedisResult<C>
1317where
1318 C: ConnectionLike + Connect + Send + Clone + 'static,
1319{
1320 match connect_and_check::<C>(&addr, core.cluster_params.clone()).await {
1321 Ok(conn) => {
1322 let conn_clone = conn.clone();
1323 core.conn_lock.write().await.0.insert(addr, conn_clone);
1324 Ok(conn)
1325 }
1326 Err(err) => Err(err),
1327 }
1328}
1329
1330async fn connect_and_check<C>(node: &str, params: ClusterParams) -> RedisResult<C>
1331where
1332 C: ConnectionLike + Connect + Send + 'static,
1333{
1334 let read_from_replicas = params.read_from_replicas;
1335 let connection_timeout = params.connection_timeout;
1336 let response_timeout = params.response_timeout;
1337 let push_sender = params.async_push_sender.clone();
1338 let tcp_settings = params.tcp_settings.clone();
1339 let dns_resolver = params.async_dns_resolver.clone();
1340 #[cfg(feature = "cache-aio")]
1341 let cache_manager = params.cache_manager.clone();
1342 let info = get_connection_info(node, params)?;
1343 let mut config = AsyncConnectionConfig::default()
1344 .set_connection_timeout(connection_timeout)
1345 .set_tcp_settings(tcp_settings);
1346 if let Some(response_timeout) = response_timeout {
1347 config = config.set_response_timeout(response_timeout);
1348 };
1349 if let Some(push_sender) = push_sender {
1350 config = config.set_push_sender_internal(push_sender);
1351 }
1352 if let Some(resolver) = dns_resolver {
1353 config = config.set_dns_resolver_internal(resolver.clone());
1354 }
1355 #[cfg(feature = "cache-aio")]
1356 if let Some(cache_manager) = cache_manager {
1357 config = config.set_cache_manager(cache_manager.clone_and_increase_epoch());
1358 }
1359 let mut conn = match C::connect_with_config(info, config).await {
1360 Ok(conn) => conn,
1361 Err(err) => {
1362 warn!("Failed to connect to node: {node:?}, due to: {err:?}");
1363 return Err(err);
1364 }
1365 };
1366
1367 let check = if read_from_replicas {
1368 cmd("READONLY")
1370 } else {
1371 cmd("PING")
1372 };
1373
1374 conn.req_packed_command(&check).await?;
1375 Ok(conn)
1376}
1377
1378async fn check_connection<C>(conn: &mut C) -> RedisResult<()>
1379where
1380 C: ConnectionLike + Send + 'static,
1381{
1382 let mut cmd = Cmd::new();
1383 cmd.arg("PING");
1384 cmd.query_async::<String>(conn).await?;
1385 Ok(())
1386}
1387
1388fn get_random_connection<C>(connections: &ConnectionMap<C>) -> Option<(String, C)>
1389where
1390 C: Clone,
1391{
1392 connections.keys().choose(&mut rng()).and_then(|addr| {
1393 connections
1394 .get(addr)
1395 .map(|conn| (addr.clone(), conn.clone()))
1396 })
1397}