sea_streamer_redis/
cluster.rs1use std::{collections::HashMap, fmt::Debug, sync::Arc};
2
3use crate::{Connection, RedisConnectOptions, RedisResult};
4use sea_streamer_types::{export::url::Url, StreamErr, StreamUrlErr, StreamerUri};
5
6pub type NodeId = Url;
8
9#[derive(Debug)]
10pub struct RedisCluster {
12 pub(crate) cluster: StreamerUri,
13 pub(crate) options: Arc<RedisConnectOptions>,
14 pub(crate) conn: HashMap<NodeId, Connection>,
15 pub(crate) keys: HashMap<String, NodeId>,
16}
17
18impl RedisCluster {
19 pub fn new(cluster: StreamerUri, options: Arc<RedisConnectOptions>) -> RedisResult<Self> {
21 if cluster.nodes().is_empty() {
22 return Err(StreamErr::StreamUrlErr(StreamUrlErr::ZeroNode));
23 }
24 if cluster.protocol().is_none() {
25 return Err(StreamErr::StreamUrlErr(StreamUrlErr::ProtocolRequired));
26 }
27 Ok(Self {
28 cluster,
29 options,
30 conn: Default::default(),
31 keys: Default::default(),
32 })
33 }
34
35 pub fn protocol(&self) -> Option<&str> {
36 self.cluster.protocol()
37 }
38
39 pub async fn reconnect_all(&mut self) -> RedisResult<()> {
41 self.conn = Default::default();
42 for node in self.cluster.nodes() {
43 let conn = Connection::create(node.clone(), self.options.clone()).await?;
44 self.conn.insert(node.clone(), conn);
45 }
46 Ok(())
47 }
48
49 pub fn reconnect(&mut self, node: &NodeId) -> RedisResult<()> {
51 if let Some(inner) = self.conn.get_mut(node) {
52 inner.reconnect();
53 }
54 Ok(())
55 }
56
57 #[inline]
58 pub fn node_for(&self, key: &str) -> &NodeId {
60 Self::get_node_for(&self.keys, &self.cluster, key)
61 }
62
63 fn get_node_for<'a>(
64 keys: &'a HashMap<String, NodeId>,
65 cluster: &'a StreamerUri,
66 key: &str,
67 ) -> &'a NodeId {
68 if let Some(node) = keys.get(key) {
69 node
70 } else {
71 cluster.nodes().first().expect("Should not be empty")
72 }
73 }
74
75 pub fn moved(&mut self, key: &str, node: NodeId) {
77 if let Some(inner) = self.keys.get_mut(key) {
78 *inner = node;
79 } else {
80 self.keys.insert(key.to_owned(), node);
81 }
82 }
83
84 pub fn get_any(&mut self) -> RedisResult<(&NodeId, &mut redis::aio::MultiplexedConnection)> {
86 for (node, inner) in self.conn.iter_mut() {
87 if let Ok(conn) = inner.try_get() {
88 return Ok((node, conn));
89 }
90 }
91 Err(StreamErr::Connect("No open connections".to_owned()))
92 }
93
94 #[inline]
95 pub async fn get(
97 &mut self,
98 node: &NodeId,
99 ) -> RedisResult<&mut redis::aio::MultiplexedConnection> {
100 Self::get_connection(&mut self.conn, &self.options, node).await
101 }
102
103 #[inline]
104 pub async fn get_connection_for(
107 &mut self,
108 key: &str,
109 ) -> RedisResult<(&NodeId, &mut redis::aio::MultiplexedConnection)> {
110 let node = Self::get_node_for(&self.keys, &self.cluster, key);
111 Ok((
112 node,
113 Self::get_connection(&mut self.conn, &self.options, node).await?,
114 ))
115 }
116
117 async fn get_connection<'a>(
118 conn: &'a mut HashMap<NodeId, Connection>,
119 options: &Arc<RedisConnectOptions>,
120 node: &NodeId,
121 ) -> RedisResult<&'a mut redis::aio::MultiplexedConnection> {
122 assert!(!node.scheme().is_empty(), "Must have protocol");
123 assert!(node.host_str().is_some(), "Must have host");
124 assert!(node.port().is_some(), "Must have port");
125 if let Some(state) = conn.get_mut(node) {
126 state.get().await.ok();
127 } else {
128 conn.insert(
129 node.clone(),
130 Connection::create_or_reconnect(node.clone(), options.clone()).await?,
131 );
132 }
133 conn.get_mut(node).expect("Must exist").try_get()
134 }
135}