sea_streamer_redis/
cluster.rs

1use std::{collections::HashMap, fmt::Debug, sync::Arc};
2
3use crate::{Connection, RedisConnectOptions, RedisResult};
4use sea_streamer_types::{export::url::Url, StreamErr, StreamUrlErr, StreamerUri};
5
6/// ID of a node in a Redis Cluster.
7pub type NodeId = Url;
8
9#[derive(Debug)]
10/// A set of connections maintained to a Redis Cluster with key cache.
11pub struct RedisCluster {
12    pub(crate) cluster: StreamerUri,
13    pub(crate) options: Arc<RedisConnectOptions>,
14    pub(crate) conn: HashMap<NodeId, Connection>,
15    pub(crate) keys: HashMap<String, NodeId>,
16}
17
18impl RedisCluster {
19    /// Nothing happens until you call connect
20    pub fn new(cluster: StreamerUri, options: Arc<RedisConnectOptions>) -> RedisResult<Self> {
21        if cluster.nodes().is_empty() {
22            return Err(StreamErr::StreamUrlErr(StreamUrlErr::ZeroNode));
23        }
24        if cluster.protocol().is_none() {
25            return Err(StreamErr::StreamUrlErr(StreamUrlErr::ProtocolRequired));
26        }
27        Ok(Self {
28            cluster,
29            options,
30            conn: Default::default(),
31            keys: Default::default(),
32        })
33    }
34
35    pub fn protocol(&self) -> Option<&str> {
36        self.cluster.protocol()
37    }
38
39    /// Will drop all existing connections. This method returns OK only if it can connect to all nodes.
40    pub async fn reconnect_all(&mut self) -> RedisResult<()> {
41        self.conn = Default::default();
42        for node in self.cluster.nodes() {
43            let conn = Connection::create(node.clone(), self.options.clone()).await?;
44            self.conn.insert(node.clone(), conn);
45        }
46        Ok(())
47    }
48
49    /// An error has occured on the connection. Attempt to reconnect *later*.
50    pub fn reconnect(&mut self, node: &NodeId) -> RedisResult<()> {
51        if let Some(inner) = self.conn.get_mut(node) {
52            inner.reconnect();
53        }
54        Ok(())
55    }
56
57    #[inline]
58    /// Get the cached node for this key. There is no guarantee that the key assignment is right.
59    pub fn node_for(&self, key: &str) -> &NodeId {
60        Self::get_node_for(&self.keys, &self.cluster, key)
61    }
62
63    fn get_node_for<'a>(
64        keys: &'a HashMap<String, NodeId>,
65        cluster: &'a StreamerUri,
66        key: &str,
67    ) -> &'a NodeId {
68        if let Some(node) = keys.get(key) {
69            node
70        } else {
71            cluster.nodes().first().expect("Should not be empty")
72        }
73    }
74
75    /// Indicate that the particular key has been moved to a different node in the cluster.
76    pub fn moved(&mut self, key: &str, node: NodeId) {
77        if let Some(inner) = self.keys.get_mut(key) {
78            *inner = node;
79        } else {
80            self.keys.insert(key.to_owned(), node);
81        }
82    }
83
84    /// Get any available connection to the cluster
85    pub fn get_any(&mut self) -> RedisResult<(&NodeId, &mut redis::aio::MultiplexedConnection)> {
86        for (node, inner) in self.conn.iter_mut() {
87            if let Ok(conn) = inner.try_get() {
88                return Ok((node, conn));
89            }
90        }
91        Err(StreamErr::Connect("No open connections".to_owned()))
92    }
93
94    #[inline]
95    /// Get a connection to the specific node, will wait and retry a few times until dead.
96    pub async fn get(
97        &mut self,
98        node: &NodeId,
99    ) -> RedisResult<&mut redis::aio::MultiplexedConnection> {
100        Self::get_connection(&mut self.conn, &self.options, node).await
101    }
102
103    #[inline]
104    /// Get a connection that is assigned with the specific key, will wait and retry a few times until dead.
105    /// There is no guarantee that the key assignment is right.
106    pub async fn get_connection_for(
107        &mut self,
108        key: &str,
109    ) -> RedisResult<(&NodeId, &mut redis::aio::MultiplexedConnection)> {
110        let node = Self::get_node_for(&self.keys, &self.cluster, key);
111        Ok((
112            node,
113            Self::get_connection(&mut self.conn, &self.options, node).await?,
114        ))
115    }
116
117    async fn get_connection<'a>(
118        conn: &'a mut HashMap<NodeId, Connection>,
119        options: &Arc<RedisConnectOptions>,
120        node: &NodeId,
121    ) -> RedisResult<&'a mut redis::aio::MultiplexedConnection> {
122        assert!(!node.scheme().is_empty(), "Must have protocol");
123        assert!(node.host_str().is_some(), "Must have host");
124        assert!(node.port().is_some(), "Must have port");
125        if let Some(state) = conn.get_mut(node) {
126            state.get().await.ok();
127        } else {
128            conn.insert(
129                node.clone(),
130                Connection::create_or_reconnect(node.clone(), options.clone()).await?,
131            );
132        }
133        conn.get_mut(node).expect("Must exist").try_get()
134    }
135}