1use crate::cluster::{calculate_slot, ClusterTopology, RedirectHandler};
6use crate::commands::{
7 Command, DecrByCommand, DecrCommand, DelCommand, ExistsCommand, ExpireCommand, GetCommand,
8 IncrByCommand, IncrCommand, SetCommand, TtlCommand,
9};
10use crate::connection::{ConnectionManager, TopologyType};
11use crate::core::{
12 config::ConnectionConfig,
13 error::{RedisError, RedisResult},
14 value::RespValue,
15};
16use crate::pool::Pool;
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Duration;
20use tokio::sync::RwLock;
21use tracing::{debug, info, warn};
22
23#[derive(Clone)]
31pub struct Client {
32 topology_type: TopologyType,
33 config: ConnectionConfig,
34 standalone_pool: Option<Arc<Pool>>,
36 cluster_pools: Arc<RwLock<HashMap<String, Arc<Pool>>>>,
38 cluster_topology: Option<ClusterTopology>,
39 redirect_handler: Option<RedirectHandler>,
40}
41
42impl Client {
43 pub async fn connect(config: ConnectionConfig) -> RedisResult<Self> {
61 info!("Connecting to Redis...");
62
63 let mut conn_manager = ConnectionManager::new(config.clone());
64 let topology_type = conn_manager.get_topology().await?;
65
66 match topology_type {
67 TopologyType::Standalone => Self::connect_standalone(config, conn_manager).await,
68 TopologyType::Cluster => Self::connect_cluster(config, conn_manager).await,
69 }
70 }
71
72 async fn connect_standalone(
73 config: ConnectionConfig,
74 _conn_manager: ConnectionManager,
75 ) -> RedisResult<Self> {
76 info!("Connecting to Standalone Redis");
77
78 let endpoints = config.parse_endpoints();
79 if endpoints.is_empty() {
80 return Err(RedisError::Config("No endpoints specified".to_string()));
81 }
82
83 let (host, port) = endpoints[0].clone();
84 let pool = Pool::new(config.clone(), host, port).await?;
85
86 Ok(Self {
87 topology_type: TopologyType::Standalone,
88 config,
89 standalone_pool: Some(Arc::new(pool)),
90 cluster_pools: Arc::new(RwLock::new(HashMap::new())),
91 cluster_topology: None,
92 redirect_handler: None,
93 })
94 }
95
96 async fn connect_cluster(
97 config: ConnectionConfig,
98 _conn_manager: ConnectionManager,
99 ) -> RedisResult<Self> {
100 info!("Connecting to Redis Cluster");
101
102 let cluster_topology = ClusterTopology::new();
103 let redirect_handler = RedirectHandler::new(cluster_topology.clone(), config.max_redirects);
104
105 let endpoints = config.parse_endpoints();
107 if endpoints.is_empty() {
108 return Err(RedisError::Config("No endpoints specified".to_string()));
109 }
110
111 for (host, port) in &endpoints {
113 match Pool::new(config.clone(), host.clone(), *port).await {
114 Ok(pool) => {
115 let node_key = format!("{}:{}", host, port);
117 let mut pools = HashMap::new();
118 pools.insert(node_key, Arc::new(pool));
119
120 return Ok(Self {
121 topology_type: TopologyType::Cluster,
122 config,
123 standalone_pool: None,
124 cluster_pools: Arc::new(RwLock::new(pools)),
125 cluster_topology: Some(cluster_topology),
126 redirect_handler: Some(redirect_handler),
127 });
128 }
129 Err(e) => {
130 warn!(
131 "Failed to connect to cluster node {}:{}: {:?}",
132 host, port, e
133 );
134 }
136 }
137 }
138
139 Err(RedisError::Cluster(
140 "Failed to connect to any cluster node".to_string(),
141 ))
142 }
143
144 async fn execute_with_redirects<C: Command>(&self, command: C) -> RedisResult<C::Output> {
146 let mut retries = 0;
147 let max_retries = self.config.max_redirects;
148
149 loop {
150 let result = self.execute_command_internal(&command).await;
151
152 match result {
153 Err(ref e) if e.is_redirect() && retries < max_retries => {
154 retries += 1;
155 debug!(
156 "Handling redirect (attempt {}/{}): {:?}",
157 retries, max_retries, e
158 );
159
160 if let Some(ref handler) = self.redirect_handler {
161 let (host, port, is_ask) = handler.handle_redirect(e).await?;
162
163 self.ensure_node_pool(&host, port).await?;
165
166 if is_ask {
168 let node_key = format!("{}:{}", host, port);
169 if let Some(pool) = self.get_cluster_pool(&node_key).await {
170 let _ = pool.execute_command("ASKING".to_string(), vec![]).await?;
172 }
173 }
174
175 continue;
177 }
178
179 return Err(RedisError::Cluster(
181 "Redirect received but no handler available".to_string(),
182 ));
183 }
184 Err(e) if e.is_redirect() => {
185 return Err(RedisError::MaxRetriesExceeded(max_retries));
186 }
187 other => {
188 return other
189 .map(|resp| command.parse_response(resp))
190 .and_then(|x| x)
191 }
192 }
193 }
194 }
195
196 async fn execute_command_internal<C: Command>(&self, command: &C) -> RedisResult<RespValue> {
197 match self.topology_type {
198 TopologyType::Standalone => {
199 if let Some(ref pool) = self.standalone_pool {
200 pool.execute_command(command.command_name().to_string(), command.args())
201 .await
202 } else {
203 Err(RedisError::Connection(
204 "No standalone pool available".to_string(),
205 ))
206 }
207 }
208 TopologyType::Cluster => {
209 let keys = command.keys();
211 if keys.is_empty() {
212 return Err(RedisError::Cluster("Command has no keys".to_string()));
213 }
214
215 let slot = calculate_slot(keys[0]);
216 debug!("Command key slot: {}", slot);
217
218 let node_key = if let Some(ref topology) = self.cluster_topology {
220 if let Some((host, port)) = topology.get_node_for_slot(slot).await {
221 Some(format!("{}:{}", host, port))
222 } else {
223 None
224 }
225 } else {
226 None
227 };
228
229 let pool = if let Some(ref key) = node_key {
231 self.get_cluster_pool(key).await
232 } else {
233 self.get_any_cluster_pool().await
235 };
236
237 if let Some(pool) = pool {
238 pool.execute_command(command.command_name().to_string(), command.args())
239 .await
240 } else {
241 Err(RedisError::Cluster(
242 "No cluster pools available".to_string(),
243 ))
244 }
245 }
246 }
247 }
248
249 async fn get_cluster_pool(&self, node_key: &str) -> Option<Arc<Pool>> {
250 let pools = self.cluster_pools.read().await;
251 pools.get(node_key).cloned()
252 }
253
254 async fn get_any_cluster_pool(&self) -> Option<Arc<Pool>> {
255 let pools = self.cluster_pools.read().await;
256 pools.values().next().cloned()
257 }
258
259 async fn ensure_node_pool(&self, host: &str, port: u16) -> RedisResult<()> {
260 let node_key = format!("{}:{}", host, port);
261
262 {
264 let pools = self.cluster_pools.read().await;
265 if pools.contains_key(&node_key) {
266 return Ok(());
267 }
268 }
269
270 let pool = Pool::new(self.config.clone(), host.to_string(), port).await?;
272
273 let mut pools = self.cluster_pools.write().await;
275 pools.insert(node_key, Arc::new(pool));
276
277 Ok(())
278 }
279
280 pub async fn get(&self, key: impl Into<String>) -> RedisResult<Option<String>> {
284 let command = GetCommand::new(key);
285 self.execute_with_redirects(command).await
286 }
287
288 pub async fn set(&self, key: impl Into<String>, value: impl Into<String>) -> RedisResult<bool> {
290 let command = SetCommand::new(key, value);
291 self.execute_with_redirects(command).await
292 }
293
294 pub async fn set_ex(
296 &self,
297 key: impl Into<String>,
298 value: impl Into<String>,
299 expiration: Duration,
300 ) -> RedisResult<bool> {
301 let command = SetCommand::new(key, value).expire(expiration);
302 self.execute_with_redirects(command).await
303 }
304
305 pub async fn set_nx(
307 &self,
308 key: impl Into<String>,
309 value: impl Into<String>,
310 ) -> RedisResult<bool> {
311 let command = SetCommand::new(key, value).only_if_not_exists();
312 self.execute_with_redirects(command).await
313 }
314
315 pub async fn del(&self, keys: Vec<String>) -> RedisResult<i64> {
317 let command = DelCommand::new(keys);
318 self.execute_with_redirects(command).await
319 }
320
321 pub async fn exists(&self, keys: Vec<String>) -> RedisResult<i64> {
323 let command = ExistsCommand::new(keys);
324 self.execute_with_redirects(command).await
325 }
326
327 pub async fn expire(&self, key: impl Into<String>, duration: Duration) -> RedisResult<bool> {
329 let command = ExpireCommand::new(key, duration);
330 self.execute_with_redirects(command).await
331 }
332
333 pub async fn ttl(&self, key: impl Into<String>) -> RedisResult<Option<i64>> {
335 let command = TtlCommand::new(key);
336 self.execute_with_redirects(command).await
337 }
338
339 pub async fn incr(&self, key: impl Into<String>) -> RedisResult<i64> {
341 let command = IncrCommand::new(key);
342 self.execute_with_redirects(command).await
343 }
344
345 pub async fn decr(&self, key: impl Into<String>) -> RedisResult<i64> {
347 let command = DecrCommand::new(key);
348 self.execute_with_redirects(command).await
349 }
350
351 pub async fn incr_by(&self, key: impl Into<String>, increment: i64) -> RedisResult<i64> {
353 let command = IncrByCommand::new(key, increment);
354 self.execute_with_redirects(command).await
355 }
356
357 pub async fn decr_by(&self, key: impl Into<String>, decrement: i64) -> RedisResult<i64> {
359 let command = DecrByCommand::new(key, decrement);
360 self.execute_with_redirects(command).await
361 }
362
363 pub fn topology_type(&self) -> TopologyType {
365 self.topology_type
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use super::*;
372
373 #[test]
374 fn test_client_configuration() {
375 let config = ConnectionConfig::new("redis://localhost:6379");
376 assert!(!config.connection_string.is_empty());
377 }
378}