1use crate::cluster::{calculate_slot, ClusterTopology, RedirectHandler};
6use crate::commands::{
7 Command,
8 DecrByCommand,
9 DecrCommand,
10 DelCommand,
11 ExistsCommand,
12 ExpireCommand,
13 GetCommand,
14 HDelCommand,
15 HExistsCommand,
16 HGetAllCommand,
17 HGetCommand,
18 HLenCommand,
19 HMGetCommand,
20 HMSetCommand,
21 HSetCommand,
22 IncrByCommand,
23 IncrCommand,
24 LIndexCommand,
26 LLenCommand,
27 LPopCommand,
28 LPushCommand,
29 LRangeCommand,
30 LSetCommand,
31 RPopCommand,
32 RPushCommand,
33 SAddCommand,
35 SCardCommand,
36 SIsMemberCommand,
37 SMembersCommand,
38 SPopCommand,
39 SRandMemberCommand,
40 SRemCommand,
41 SetCommand,
42 TtlCommand,
43 ZAddCommand,
45 ZCardCommand,
46 ZRangeCommand,
47 ZRankCommand,
48 ZRemCommand,
49 ZRevRankCommand,
50 ZScoreCommand,
51};
52use crate::connection::{ConnectionManager, TopologyType};
53use crate::core::{
54 config::ConnectionConfig,
55 error::{RedisError, RedisResult},
56 value::RespValue,
57};
58use crate::pipeline::{Pipeline, PipelineCommand, PipelineExecutor};
59use crate::pool::Pool;
60use crate::pubsub::{PubSubConnection, Publisher, Subscriber};
61use crate::transaction::{Transaction, TransactionCommand, TransactionExecutor};
62use std::collections::HashMap;
63use std::sync::Arc;
64use std::time::Duration;
65use tokio::sync::RwLock;
66use tracing::{debug, info, warn};
67
68#[derive(Clone)]
76pub struct Client {
77 topology_type: TopologyType,
78 config: ConnectionConfig,
79 standalone_pool: Option<Arc<Pool>>,
81 cluster_pools: Arc<RwLock<HashMap<String, Arc<Pool>>>>,
83 cluster_topology: Option<ClusterTopology>,
84 redirect_handler: Option<RedirectHandler>,
85}
86
87impl Client {
88 pub async fn connect(config: ConnectionConfig) -> RedisResult<Self> {
106 info!("Connecting to Redis...");
107
108 let mut conn_manager = ConnectionManager::new(config.clone());
109 let topology_type = conn_manager.get_topology().await?;
110
111 match topology_type {
112 TopologyType::Standalone => Self::connect_standalone(config, conn_manager).await,
113 TopologyType::Cluster => Self::connect_cluster(config, conn_manager).await,
114 }
115 }
116
117 async fn connect_standalone(
118 config: ConnectionConfig,
119 _conn_manager: ConnectionManager,
120 ) -> RedisResult<Self> {
121 info!("Connecting to Standalone Redis");
122
123 let endpoints = config.parse_endpoints();
124 if endpoints.is_empty() {
125 return Err(RedisError::Config("No endpoints specified".to_string()));
126 }
127
128 let (host, port) = endpoints[0].clone();
129 let pool = Pool::new(config.clone(), host, port).await?;
130
131 Ok(Self {
132 topology_type: TopologyType::Standalone,
133 config,
134 standalone_pool: Some(Arc::new(pool)),
135 cluster_pools: Arc::new(RwLock::new(HashMap::new())),
136 cluster_topology: None,
137 redirect_handler: None,
138 })
139 }
140
141 async fn connect_cluster(
142 config: ConnectionConfig,
143 _conn_manager: ConnectionManager,
144 ) -> RedisResult<Self> {
145 info!("Connecting to Redis Cluster");
146
147 let cluster_topology = ClusterTopology::new();
148 let redirect_handler = RedirectHandler::new(cluster_topology.clone(), config.max_redirects);
149
150 let endpoints = config.parse_endpoints();
152 if endpoints.is_empty() {
153 return Err(RedisError::Config("No endpoints specified".to_string()));
154 }
155
156 for (host, port) in &endpoints {
158 match Pool::new(config.clone(), host.clone(), *port).await {
159 Ok(pool) => {
160 let node_key = format!("{}:{}", host, port);
162 let mut pools = HashMap::new();
163 pools.insert(node_key, Arc::new(pool));
164
165 return Ok(Self {
166 topology_type: TopologyType::Cluster,
167 config,
168 standalone_pool: None,
169 cluster_pools: Arc::new(RwLock::new(pools)),
170 cluster_topology: Some(cluster_topology),
171 redirect_handler: Some(redirect_handler),
172 });
173 }
174 Err(e) => {
175 warn!(
176 "Failed to connect to cluster node {}:{}: {:?}",
177 host, port, e
178 );
179 }
181 }
182 }
183
184 Err(RedisError::Cluster(
185 "Failed to connect to any cluster node".to_string(),
186 ))
187 }
188
189 async fn execute_with_redirects<C: Command>(&self, command: C) -> RedisResult<C::Output> {
191 let mut retries = 0;
192 let max_retries = self.config.max_redirects;
193
194 loop {
195 let result = self.execute_command_internal(&command).await;
196
197 match result {
198 Err(ref e) if e.is_redirect() && retries < max_retries => {
199 retries += 1;
200 debug!(
201 "Handling redirect (attempt {}/{}): {:?}",
202 retries, max_retries, e
203 );
204
205 if let Some(ref handler) = self.redirect_handler {
206 let (host, port, is_ask) = handler.handle_redirect(e).await?;
207
208 self.ensure_node_pool(&host, port).await?;
210
211 if is_ask {
213 let node_key = format!("{}:{}", host, port);
214 if let Some(pool) = self.get_cluster_pool(&node_key).await {
215 let _ = pool.execute_command("ASKING".to_string(), vec![]).await?;
217 }
218 }
219
220 continue;
222 }
223
224 return Err(RedisError::Cluster(
226 "Redirect received but no handler available".to_string(),
227 ));
228 }
229 Err(e) if e.is_redirect() => {
230 return Err(RedisError::MaxRetriesExceeded(max_retries));
231 }
232 other => {
233 return other
234 .map(|resp| command.parse_response(resp))
235 .and_then(|x| x)
236 }
237 }
238 }
239 }
240
241 async fn execute_command_internal<C: Command>(&self, command: &C) -> RedisResult<RespValue> {
242 match self.topology_type {
243 TopologyType::Standalone => {
244 if let Some(ref pool) = self.standalone_pool {
245 pool.execute_command(command.command_name().to_string(), command.args())
246 .await
247 } else {
248 Err(RedisError::Connection(
249 "No standalone pool available".to_string(),
250 ))
251 }
252 }
253 TopologyType::Cluster => {
254 let keys = command.keys();
256 if keys.is_empty() {
257 return Err(RedisError::Cluster("Command has no keys".to_string()));
258 }
259
260 let slot = calculate_slot(keys[0]);
261 debug!("Command key slot: {}", slot);
262
263 let node_key = if let Some(ref topology) = self.cluster_topology {
265 if let Some((host, port)) = topology.get_node_for_slot(slot).await {
266 Some(format!("{}:{}", host, port))
267 } else {
268 None
269 }
270 } else {
271 None
272 };
273
274 let pool = if let Some(ref key) = node_key {
276 self.get_cluster_pool(key).await
277 } else {
278 self.get_any_cluster_pool().await
280 };
281
282 if let Some(pool) = pool {
283 pool.execute_command(command.command_name().to_string(), command.args())
284 .await
285 } else {
286 Err(RedisError::Cluster(
287 "No cluster pools available".to_string(),
288 ))
289 }
290 }
291 }
292 }
293
294 async fn get_cluster_pool(&self, node_key: &str) -> Option<Arc<Pool>> {
295 let pools = self.cluster_pools.read().await;
296 pools.get(node_key).cloned()
297 }
298
299 async fn get_any_cluster_pool(&self) -> Option<Arc<Pool>> {
300 let pools = self.cluster_pools.read().await;
301 pools.values().next().cloned()
302 }
303
304 async fn ensure_node_pool(&self, host: &str, port: u16) -> RedisResult<()> {
305 let node_key = format!("{}:{}", host, port);
306
307 {
309 let pools = self.cluster_pools.read().await;
310 if pools.contains_key(&node_key) {
311 return Ok(());
312 }
313 }
314
315 let pool = Pool::new(self.config.clone(), host.to_string(), port).await?;
317
318 let mut pools = self.cluster_pools.write().await;
320 pools.insert(node_key, Arc::new(pool));
321
322 Ok(())
323 }
324
325 pub async fn get(&self, key: impl Into<String>) -> RedisResult<Option<String>> {
329 let command = GetCommand::new(key);
330 self.execute_with_redirects(command).await
331 }
332
333 pub async fn set(&self, key: impl Into<String>, value: impl Into<String>) -> RedisResult<bool> {
335 let command = SetCommand::new(key, value);
336 self.execute_with_redirects(command).await
337 }
338
339 pub async fn set_ex(
341 &self,
342 key: impl Into<String>,
343 value: impl Into<String>,
344 expiration: Duration,
345 ) -> RedisResult<bool> {
346 let command = SetCommand::new(key, value).expire(expiration);
347 self.execute_with_redirects(command).await
348 }
349
350 pub async fn set_nx(
352 &self,
353 key: impl Into<String>,
354 value: impl Into<String>,
355 ) -> RedisResult<bool> {
356 let command = SetCommand::new(key, value).only_if_not_exists();
357 self.execute_with_redirects(command).await
358 }
359
360 pub async fn del(&self, keys: Vec<String>) -> RedisResult<i64> {
362 let command = DelCommand::new(keys);
363 self.execute_with_redirects(command).await
364 }
365
366 pub async fn exists(&self, keys: Vec<String>) -> RedisResult<i64> {
368 let command = ExistsCommand::new(keys);
369 self.execute_with_redirects(command).await
370 }
371
372 pub async fn expire(&self, key: impl Into<String>, duration: Duration) -> RedisResult<bool> {
374 let command = ExpireCommand::new(key, duration);
375 self.execute_with_redirects(command).await
376 }
377
378 pub async fn ttl(&self, key: impl Into<String>) -> RedisResult<Option<i64>> {
380 let command = TtlCommand::new(key);
381 self.execute_with_redirects(command).await
382 }
383
384 pub async fn incr(&self, key: impl Into<String>) -> RedisResult<i64> {
386 let command = IncrCommand::new(key);
387 self.execute_with_redirects(command).await
388 }
389
390 pub async fn decr(&self, key: impl Into<String>) -> RedisResult<i64> {
392 let command = DecrCommand::new(key);
393 self.execute_with_redirects(command).await
394 }
395
396 pub async fn incr_by(&self, key: impl Into<String>, increment: i64) -> RedisResult<i64> {
398 let command = IncrByCommand::new(key, increment);
399 self.execute_with_redirects(command).await
400 }
401
402 pub async fn decr_by(&self, key: impl Into<String>, decrement: i64) -> RedisResult<i64> {
404 let command = DecrByCommand::new(key, decrement);
405 self.execute_with_redirects(command).await
406 }
407
408 pub async fn hget(
412 &self,
413 key: impl Into<String>,
414 field: impl Into<String>,
415 ) -> RedisResult<Option<String>> {
416 let command = HGetCommand::new(key, field);
417 self.execute_with_redirects(command).await
418 }
419
420 pub async fn hset(
422 &self,
423 key: impl Into<String>,
424 field: impl Into<String>,
425 value: impl Into<String>,
426 ) -> RedisResult<i64> {
427 let command = HSetCommand::new(key, field, value);
428 self.execute_with_redirects(command).await
429 }
430
431 pub async fn hdel(&self, key: impl Into<String>, fields: Vec<String>) -> RedisResult<i64> {
433 let command = HDelCommand::new(key, fields);
434 self.execute_with_redirects(command).await
435 }
436
437 pub async fn hgetall(
439 &self,
440 key: impl Into<String>,
441 ) -> RedisResult<std::collections::HashMap<String, String>> {
442 let command = HGetAllCommand::new(key);
443 self.execute_with_redirects(command).await
444 }
445
446 pub async fn hmget(
448 &self,
449 key: impl Into<String>,
450 fields: Vec<String>,
451 ) -> RedisResult<Vec<Option<String>>> {
452 let command = HMGetCommand::new(key, fields);
453 self.execute_with_redirects(command).await
454 }
455
456 pub async fn hmset(
458 &self,
459 key: impl Into<String>,
460 fields: std::collections::HashMap<String, String>,
461 ) -> RedisResult<String> {
462 let command = HMSetCommand::new(key, fields);
463 self.execute_with_redirects(command).await
464 }
465
466 pub async fn hlen(&self, key: impl Into<String>) -> RedisResult<i64> {
468 let command = HLenCommand::new(key);
469 self.execute_with_redirects(command).await
470 }
471
472 pub async fn hexists(
474 &self,
475 key: impl Into<String>,
476 field: impl Into<String>,
477 ) -> RedisResult<bool> {
478 let command = HExistsCommand::new(key, field);
479 self.execute_with_redirects(command).await
480 }
481
482 pub async fn lpush(&self, key: impl Into<String>, values: Vec<String>) -> RedisResult<i64> {
486 let command = LPushCommand::new(key, values);
487 self.execute_with_redirects(command).await
488 }
489
490 pub async fn rpush(&self, key: impl Into<String>, values: Vec<String>) -> RedisResult<i64> {
492 let command = RPushCommand::new(key, values);
493 self.execute_with_redirects(command).await
494 }
495
496 pub async fn lpop(&self, key: impl Into<String>) -> RedisResult<Option<String>> {
498 let command = LPopCommand::new(key);
499 self.execute_with_redirects(command).await
500 }
501
502 pub async fn rpop(&self, key: impl Into<String>) -> RedisResult<Option<String>> {
504 let command = RPopCommand::new(key);
505 self.execute_with_redirects(command).await
506 }
507
508 pub async fn lrange(
510 &self,
511 key: impl Into<String>,
512 start: i64,
513 stop: i64,
514 ) -> RedisResult<Vec<String>> {
515 let command = LRangeCommand::new(key, start, stop);
516 self.execute_with_redirects(command).await
517 }
518
519 pub async fn llen(&self, key: impl Into<String>) -> RedisResult<i64> {
521 let command = LLenCommand::new(key);
522 self.execute_with_redirects(command).await
523 }
524
525 pub async fn lindex(&self, key: impl Into<String>, index: i64) -> RedisResult<Option<String>> {
527 let command = LIndexCommand::new(key, index);
528 self.execute_with_redirects(command).await
529 }
530
531 pub async fn lset(
533 &self,
534 key: impl Into<String>,
535 index: i64,
536 value: impl Into<String>,
537 ) -> RedisResult<()> {
538 let command = LSetCommand::new(key, index, value);
539 let _result: String = self.execute_with_redirects(command).await?;
540 Ok(())
541 }
542
543 pub async fn sadd(&self, key: impl Into<String>, members: Vec<String>) -> RedisResult<i64> {
547 let command = SAddCommand::new(key, members);
548 self.execute_with_redirects(command).await
549 }
550
551 pub async fn srem(&self, key: impl Into<String>, members: Vec<String>) -> RedisResult<i64> {
553 let command = SRemCommand::new(key, members);
554 self.execute_with_redirects(command).await
555 }
556
557 pub async fn smembers(
559 &self,
560 key: impl Into<String>,
561 ) -> RedisResult<std::collections::HashSet<String>> {
562 let command = SMembersCommand::new(key);
563 self.execute_with_redirects(command).await
564 }
565
566 pub async fn sismember(
568 &self,
569 key: impl Into<String>,
570 member: impl Into<String>,
571 ) -> RedisResult<bool> {
572 let command = SIsMemberCommand::new(key, member);
573 self.execute_with_redirects(command).await
574 }
575
576 pub async fn scard(&self, key: impl Into<String>) -> RedisResult<i64> {
578 let command = SCardCommand::new(key);
579 self.execute_with_redirects(command).await
580 }
581
582 pub async fn spop(&self, key: impl Into<String>) -> RedisResult<Option<String>> {
584 let command = SPopCommand::new(key);
585 self.execute_with_redirects(command).await
586 }
587
588 pub async fn srandmember(&self, key: impl Into<String>) -> RedisResult<Option<String>> {
590 let command = SRandMemberCommand::new(key);
591 self.execute_with_redirects(command).await
592 }
593
594 pub async fn zadd(
598 &self,
599 key: impl Into<String>,
600 members: std::collections::HashMap<String, f64>,
601 ) -> RedisResult<i64> {
602 let command = ZAddCommand::new(key, members);
603 self.execute_with_redirects(command).await
604 }
605
606 pub async fn zrem(&self, key: impl Into<String>, members: Vec<String>) -> RedisResult<i64> {
608 let command = ZRemCommand::new(key, members);
609 self.execute_with_redirects(command).await
610 }
611
612 pub async fn zrange(
614 &self,
615 key: impl Into<String>,
616 start: i64,
617 stop: i64,
618 ) -> RedisResult<Vec<String>> {
619 let command = ZRangeCommand::new(key, start, stop);
620 self.execute_with_redirects(command).await
621 }
622
623 pub async fn zscore(
625 &self,
626 key: impl Into<String>,
627 member: impl Into<String>,
628 ) -> RedisResult<Option<f64>> {
629 let command = ZScoreCommand::new(key, member);
630 self.execute_with_redirects(command).await
631 }
632
633 pub async fn zcard(&self, key: impl Into<String>) -> RedisResult<i64> {
635 let command = ZCardCommand::new(key);
636 self.execute_with_redirects(command).await
637 }
638
639 pub async fn zrank(
641 &self,
642 key: impl Into<String>,
643 member: impl Into<String>,
644 ) -> RedisResult<Option<i64>> {
645 let command = ZRankCommand::new(key, member);
646 self.execute_with_redirects(command).await
647 }
648
649 pub async fn zrevrank(
651 &self,
652 key: impl Into<String>,
653 member: impl Into<String>,
654 ) -> RedisResult<Option<i64>> {
655 let command = ZRevRankCommand::new(key, member);
656 self.execute_with_redirects(command).await
657 }
658
659 pub fn pipeline(&self) -> Pipeline {
685 let client_executor = ClientPipelineExecutor {
686 client: self.clone(),
687 };
688 Pipeline::new(Arc::new(tokio::sync::Mutex::new(client_executor)))
689 }
690
691 pub async fn transaction(&self) -> RedisResult<Transaction> {
717 let client_executor = ClientTransactionExecutor {
718 client: self.clone(),
719 };
720 Ok(Transaction::new(Arc::new(tokio::sync::Mutex::new(
721 client_executor,
722 ))))
723 }
724
725 pub async fn publish(
745 &self,
746 channel: impl Into<String>,
747 message: impl Into<String>,
748 ) -> RedisResult<i64> {
749 let channel = channel.into();
750 let message = message.into();
751
752 let args = vec![
753 RespValue::from(channel.as_str()),
754 RespValue::from(message.as_str()),
755 ];
756
757 match self.topology_type {
758 TopologyType::Standalone => {
759 if let Some(pool) = &self.standalone_pool {
760 let result = pool.execute_command("PUBLISH".to_string(), args).await?;
761 result.as_int()
762 } else {
763 Err(RedisError::Connection(
764 "No standalone pool available".to_string(),
765 ))
766 }
767 }
768 TopologyType::Cluster => {
769 let pools = self.cluster_pools.read().await;
771 if let Some((_, pool)) = pools.iter().next() {
772 let result = pool.execute_command("PUBLISH".to_string(), args).await?;
773 result.as_int()
774 } else {
775 Err(RedisError::Cluster(
776 "No cluster nodes available".to_string(),
777 ))
778 }
779 }
780 }
781 }
782
783 pub async fn subscriber(&self) -> RedisResult<Subscriber> {
806 let client_connection = ClientPubSubConnection {
807 client: self.clone(),
808 };
809 Ok(Subscriber::new(Arc::new(tokio::sync::Mutex::new(
810 client_connection,
811 ))))
812 }
813
814 pub async fn publisher(&self) -> RedisResult<Publisher> {
833 let client_connection = ClientPubSubConnection {
834 client: self.clone(),
835 };
836 Ok(Publisher::new(Arc::new(tokio::sync::Mutex::new(
837 client_connection,
838 ))))
839 }
840
841 pub async fn eval<T>(
872 &self,
873 script: &str,
874 keys: Vec<String>,
875 args: Vec<String>,
876 ) -> RedisResult<T>
877 where
878 T: std::convert::TryFrom<RespValue>,
879 T::Error: Into<RedisError>,
880 {
881 let mut cmd_args = vec![
882 RespValue::from(script),
883 RespValue::from(keys.len().to_string()),
884 ];
885
886 for key in keys {
888 cmd_args.push(RespValue::from(key));
889 }
890
891 for arg in args {
893 cmd_args.push(RespValue::from(arg));
894 }
895
896 let result = match self.topology_type {
897 TopologyType::Standalone => {
898 if let Some(pool) = &self.standalone_pool {
899 pool.execute_command("EVAL".to_string(), cmd_args).await?
900 } else {
901 return Err(RedisError::Connection(
902 "No standalone pool available".to_string(),
903 ));
904 }
905 }
906 TopologyType::Cluster => {
907 let pools = self.cluster_pools.read().await;
909 if let Some((_, pool)) = pools.iter().next() {
910 pool.execute_command("EVAL".to_string(), cmd_args).await?
911 } else {
912 return Err(RedisError::Cluster(
913 "No cluster nodes available".to_string(),
914 ));
915 }
916 }
917 };
918
919 T::try_from(result).map_err(Into::into)
920 }
921
922 pub async fn evalsha<T>(
955 &self,
956 sha: &str,
957 keys: Vec<String>,
958 args: Vec<String>,
959 ) -> RedisResult<T>
960 where
961 T: std::convert::TryFrom<RespValue>,
962 T::Error: Into<RedisError>,
963 {
964 let mut cmd_args = vec![
965 RespValue::from(sha),
966 RespValue::from(keys.len().to_string()),
967 ];
968
969 for key in keys {
971 cmd_args.push(RespValue::from(key));
972 }
973
974 for arg in args {
976 cmd_args.push(RespValue::from(arg));
977 }
978
979 let result = match self.topology_type {
980 TopologyType::Standalone => {
981 if let Some(pool) = &self.standalone_pool {
982 pool.execute_command("EVALSHA".to_string(), cmd_args)
983 .await?
984 } else {
985 return Err(RedisError::Connection(
986 "No standalone pool available".to_string(),
987 ));
988 }
989 }
990 TopologyType::Cluster => {
991 let pools = self.cluster_pools.read().await;
993 if let Some((_, pool)) = pools.iter().next() {
994 pool.execute_command("EVALSHA".to_string(), cmd_args)
995 .await?
996 } else {
997 return Err(RedisError::Cluster(
998 "No cluster nodes available".to_string(),
999 ));
1000 }
1001 }
1002 };
1003
1004 T::try_from(result).map_err(Into::into)
1005 }
1006
1007 pub async fn script_load(&self, script: &str) -> RedisResult<String> {
1028 let result = match self.topology_type {
1029 TopologyType::Standalone => {
1030 if let Some(pool) = &self.standalone_pool {
1031 pool.execute_command(
1032 "SCRIPT".to_string(),
1033 vec![RespValue::from("LOAD"), RespValue::from(script)],
1034 )
1035 .await?
1036 } else {
1037 return Err(RedisError::Connection(
1038 "No standalone pool available".to_string(),
1039 ));
1040 }
1041 }
1042 TopologyType::Cluster => {
1043 let pools = self.cluster_pools.read().await;
1045 let mut sha = String::new();
1046
1047 for (_, pool) in pools.iter() {
1048 let result = pool
1049 .execute_command(
1050 "SCRIPT".to_string(),
1051 vec![RespValue::from("LOAD"), RespValue::from(script)],
1052 )
1053 .await?;
1054 sha = result.as_string()?;
1055 }
1056
1057 if sha.is_empty() {
1058 return Err(RedisError::Cluster(
1059 "No cluster nodes available".to_string(),
1060 ));
1061 }
1062
1063 return Ok(sha);
1064 }
1065 };
1066
1067 result.as_string()
1068 }
1069
1070 pub async fn script_exists(&self, shas: Vec<String>) -> RedisResult<Vec<bool>> {
1091 let mut cmd_args = vec![RespValue::from("EXISTS")];
1092 for sha in shas {
1093 cmd_args.push(RespValue::from(sha));
1094 }
1095
1096 let result = match self.topology_type {
1097 TopologyType::Standalone => {
1098 if let Some(pool) = &self.standalone_pool {
1099 pool.execute_command("SCRIPT".to_string(), cmd_args).await?
1100 } else {
1101 return Err(RedisError::Connection(
1102 "No standalone pool available".to_string(),
1103 ));
1104 }
1105 }
1106 TopologyType::Cluster => {
1107 let pools = self.cluster_pools.read().await;
1109 if let Some((_, pool)) = pools.iter().next() {
1110 pool.execute_command("SCRIPT".to_string(), cmd_args).await?
1111 } else {
1112 return Err(RedisError::Cluster(
1113 "No cluster nodes available".to_string(),
1114 ));
1115 }
1116 }
1117 };
1118
1119 match result {
1120 RespValue::Array(items) => {
1121 let mut exists = Vec::new();
1122 for item in items {
1123 match item {
1124 RespValue::Integer(1) => exists.push(true),
1125 RespValue::Integer(0) => exists.push(false),
1126 _ => {
1127 return Err(RedisError::Type(format!(
1128 "Unexpected response in SCRIPT EXISTS: {:?}",
1129 item
1130 )))
1131 }
1132 }
1133 }
1134 Ok(exists)
1135 }
1136 _ => Err(RedisError::Type(format!(
1137 "Unexpected response type for SCRIPT EXISTS: {:?}",
1138 result
1139 ))),
1140 }
1141 }
1142
1143 pub async fn script_flush(&self) -> RedisResult<()> {
1161 let cmd_args = vec![RespValue::from("FLUSH")];
1162
1163 match self.topology_type {
1164 TopologyType::Standalone => {
1165 if let Some(pool) = &self.standalone_pool {
1166 let _result = pool.execute_command("SCRIPT".to_string(), cmd_args).await?;
1167 Ok(())
1168 } else {
1169 Err(RedisError::Connection(
1170 "No standalone pool available".to_string(),
1171 ))
1172 }
1173 }
1174 TopologyType::Cluster => {
1175 let pools = self.cluster_pools.read().await;
1177 for (_, pool) in pools.iter() {
1178 let _result = pool
1179 .execute_command("SCRIPT".to_string(), cmd_args.clone())
1180 .await?;
1181 }
1182 Ok(())
1183 }
1184 }
1185 }
1186
1187 pub async fn xadd(
1218 &self,
1219 stream: impl Into<String>,
1220 id: impl Into<String>,
1221 fields: std::collections::HashMap<String, String>,
1222 ) -> RedisResult<String> {
1223 let stream = stream.into();
1224 let id = id.into();
1225
1226 let mut cmd_args = vec![RespValue::from(stream.clone()), RespValue::from(id)];
1227
1228 for (field, value) in fields {
1230 cmd_args.push(RespValue::from(field));
1231 cmd_args.push(RespValue::from(value));
1232 }
1233
1234 let result = match self.topology_type {
1235 TopologyType::Standalone => {
1236 if let Some(pool) = &self.standalone_pool {
1237 pool.execute_command("XADD".to_string(), cmd_args).await?
1238 } else {
1239 return Err(RedisError::Connection(
1240 "No standalone pool available".to_string(),
1241 ));
1242 }
1243 }
1244 TopologyType::Cluster => {
1245 let slot = calculate_slot(stream.as_bytes());
1247
1248 let node_key = if let Some(ref topology) = self.cluster_topology {
1250 if let Some((host, port)) = topology.get_node_for_slot(slot).await {
1251 Some(format!("{}:{}", host, port))
1252 } else {
1253 None
1254 }
1255 } else {
1256 None
1257 };
1258
1259 if let Some(node_key) = node_key {
1260 if let Some(pool) = self.get_cluster_pool(&node_key).await {
1261 pool.execute_command("XADD".to_string(), cmd_args).await?
1262 } else {
1263 return Err(RedisError::Cluster(format!(
1264 "Pool not found for node: {}",
1265 node_key
1266 )));
1267 }
1268 } else {
1269 return Err(RedisError::Cluster(format!(
1270 "No node found for slot: {}",
1271 slot
1272 )));
1273 }
1274 }
1275 };
1276
1277 result.as_string()
1278 }
1279
1280 pub async fn xread(
1311 &self,
1312 streams: Vec<(String, String)>,
1313 count: Option<u64>,
1314 block: Option<Duration>,
1315 ) -> RedisResult<std::collections::HashMap<String, Vec<crate::streams::StreamEntry>>> {
1316 let mut cmd_args = vec![];
1317
1318 if let Some(count) = count {
1320 cmd_args.push(RespValue::from("COUNT"));
1321 cmd_args.push(RespValue::from(count.to_string()));
1322 }
1323
1324 if let Some(block) = block {
1326 cmd_args.push(RespValue::from("BLOCK"));
1327 cmd_args.push(RespValue::from(block.as_millis().to_string()));
1328 }
1329
1330 cmd_args.push(RespValue::from("STREAMS"));
1332
1333 for (stream, _) in &streams {
1335 cmd_args.push(RespValue::from(stream.clone()));
1336 }
1337
1338 for (_, id) in &streams {
1340 cmd_args.push(RespValue::from(id.clone()));
1341 }
1342
1343 let result = match self.topology_type {
1344 TopologyType::Standalone => {
1345 if let Some(pool) = &self.standalone_pool {
1346 pool.execute_command("XREAD".to_string(), cmd_args).await?
1347 } else {
1348 return Err(RedisError::Connection(
1349 "No standalone pool available".to_string(),
1350 ));
1351 }
1352 }
1353 TopologyType::Cluster => {
1354 let pools = self.cluster_pools.read().await;
1356 if let Some((_, pool)) = pools.iter().next() {
1357 pool.execute_command("XREAD".to_string(), cmd_args).await?
1358 } else {
1359 return Err(RedisError::Cluster(
1360 "No cluster nodes available".to_string(),
1361 ));
1362 }
1363 }
1364 };
1365
1366 crate::streams::parse_xread_response(result)
1367 }
1368
1369 pub async fn xrange(
1400 &self,
1401 stream: impl Into<String>,
1402 start: impl Into<String>,
1403 end: impl Into<String>,
1404 count: Option<u64>,
1405 ) -> RedisResult<Vec<crate::streams::StreamEntry>> {
1406 let stream = stream.into();
1407 let mut cmd_args = vec![
1408 RespValue::from(stream.clone()),
1409 RespValue::from(start.into()),
1410 RespValue::from(end.into()),
1411 ];
1412
1413 if let Some(count) = count {
1414 cmd_args.push(RespValue::from("COUNT"));
1415 cmd_args.push(RespValue::from(count.to_string()));
1416 }
1417
1418 let result = match self.topology_type {
1419 TopologyType::Standalone => {
1420 if let Some(pool) = &self.standalone_pool {
1421 pool.execute_command("XRANGE".to_string(), cmd_args).await?
1422 } else {
1423 return Err(RedisError::Connection(
1424 "No standalone pool available".to_string(),
1425 ));
1426 }
1427 }
1428 TopologyType::Cluster => {
1429 let slot = calculate_slot(stream.as_bytes());
1431 let node_key = if let Some(ref topology) = self.cluster_topology {
1433 if let Some((host, port)) = topology.get_node_for_slot(slot).await {
1434 Some(format!("{}:{}", host, port))
1435 } else {
1436 None
1437 }
1438 } else {
1439 None
1440 };
1441
1442 if let Some(node_key) = node_key {
1443 if let Some(pool) = self.get_cluster_pool(&node_key).await {
1444 pool.execute_command("XRANGE".to_string(), cmd_args).await?
1445 } else {
1446 return Err(RedisError::Cluster(format!(
1447 "Pool not found for node: {}",
1448 node_key
1449 )));
1450 }
1451 } else {
1452 return Err(RedisError::Cluster(format!(
1453 "No node found for slot: {}",
1454 slot
1455 )));
1456 }
1457 }
1458 };
1459
1460 crate::streams::parse_stream_entries(result)
1461 }
1462
1463 pub async fn xlen(&self, stream: impl Into<String>) -> RedisResult<u64> {
1481 let stream = stream.into();
1482 let cmd_args = vec![RespValue::from(stream.clone())];
1483
1484 let result = match self.topology_type {
1485 TopologyType::Standalone => {
1486 if let Some(pool) = &self.standalone_pool {
1487 pool.execute_command("XLEN".to_string(), cmd_args).await?
1488 } else {
1489 return Err(RedisError::Connection(
1490 "No standalone pool available".to_string(),
1491 ));
1492 }
1493 }
1494 TopologyType::Cluster => {
1495 let slot = calculate_slot(stream.as_bytes());
1497 let node_key = if let Some(ref topology) = self.cluster_topology {
1499 if let Some((host, port)) = topology.get_node_for_slot(slot).await {
1500 Some(format!("{}:{}", host, port))
1501 } else {
1502 None
1503 }
1504 } else {
1505 None
1506 };
1507
1508 if let Some(node_key) = node_key {
1509 if let Some(pool) = self.get_cluster_pool(&node_key).await {
1510 pool.execute_command("XLEN".to_string(), cmd_args).await?
1511 } else {
1512 return Err(RedisError::Cluster(format!(
1513 "Pool not found for node: {}",
1514 node_key
1515 )));
1516 }
1517 } else {
1518 return Err(RedisError::Cluster(format!(
1519 "No node found for slot: {}",
1520 slot
1521 )));
1522 }
1523 }
1524 };
1525
1526 Ok(result.as_int()? as u64)
1527 }
1528
1529 pub async fn xgroup_create(
1555 &self,
1556 stream: impl Into<String>,
1557 group: impl Into<String>,
1558 id: impl Into<String>,
1559 mkstream: bool,
1560 ) -> RedisResult<()> {
1561 let stream = stream.into();
1562 let mut cmd_args = vec![
1563 RespValue::from("CREATE"),
1564 RespValue::from(stream.clone()),
1565 RespValue::from(group.into()),
1566 RespValue::from(id.into()),
1567 ];
1568
1569 if mkstream {
1570 cmd_args.push(RespValue::from("MKSTREAM"));
1571 }
1572
1573 let result = match self.topology_type {
1574 TopologyType::Standalone => {
1575 if let Some(pool) = &self.standalone_pool {
1576 pool.execute_command("XGROUP".to_string(), cmd_args).await?
1577 } else {
1578 return Err(RedisError::Connection(
1579 "No standalone pool available".to_string(),
1580 ));
1581 }
1582 }
1583 TopologyType::Cluster => {
1584 let slot = calculate_slot(stream.as_bytes());
1586 let node_key = if let Some(ref topology) = self.cluster_topology {
1588 if let Some((host, port)) = topology.get_node_for_slot(slot).await {
1589 Some(format!("{}:{}", host, port))
1590 } else {
1591 None
1592 }
1593 } else {
1594 None
1595 };
1596
1597 if let Some(node_key) = node_key {
1598 if let Some(pool) = self.get_cluster_pool(&node_key).await {
1599 pool.execute_command("XGROUP".to_string(), cmd_args).await?
1600 } else {
1601 return Err(RedisError::Cluster(format!(
1602 "Pool not found for node: {}",
1603 node_key
1604 )));
1605 }
1606 } else {
1607 return Err(RedisError::Cluster(format!(
1608 "No node found for slot: {}",
1609 slot
1610 )));
1611 }
1612 }
1613 };
1614
1615 match result.as_string()?.as_str() {
1617 "OK" => Ok(()),
1618 other => Err(RedisError::Protocol(format!(
1619 "Unexpected XGROUP CREATE response: {}",
1620 other
1621 ))),
1622 }
1623 }
1624
1625 pub async fn xreadgroup(
1666 &self,
1667 group: impl Into<String>,
1668 consumer: impl Into<String>,
1669 streams: Vec<(String, String)>,
1670 count: Option<u64>,
1671 block: Option<Duration>,
1672 ) -> RedisResult<std::collections::HashMap<String, Vec<crate::streams::StreamEntry>>> {
1673 let mut cmd_args = vec![
1674 RespValue::from("GROUP"),
1675 RespValue::from(group.into()),
1676 RespValue::from(consumer.into()),
1677 ];
1678
1679 if let Some(count) = count {
1681 cmd_args.push(RespValue::from("COUNT"));
1682 cmd_args.push(RespValue::from(count.to_string()));
1683 }
1684
1685 if let Some(block) = block {
1687 cmd_args.push(RespValue::from("BLOCK"));
1688 cmd_args.push(RespValue::from(block.as_millis().to_string()));
1689 }
1690
1691 cmd_args.push(RespValue::from("STREAMS"));
1693
1694 for (stream, _) in &streams {
1696 cmd_args.push(RespValue::from(stream.clone()));
1697 }
1698
1699 for (_, id) in &streams {
1701 cmd_args.push(RespValue::from(id.clone()));
1702 }
1703
1704 let result = match self.topology_type {
1705 TopologyType::Standalone => {
1706 if let Some(pool) = &self.standalone_pool {
1707 pool.execute_command("XREADGROUP".to_string(), cmd_args)
1708 .await?
1709 } else {
1710 return Err(RedisError::Connection(
1711 "No standalone pool available".to_string(),
1712 ));
1713 }
1714 }
1715 TopologyType::Cluster => {
1716 let pools = self.cluster_pools.read().await;
1718 if let Some((_, pool)) = pools.iter().next() {
1719 pool.execute_command("XREADGROUP".to_string(), cmd_args)
1720 .await?
1721 } else {
1722 return Err(RedisError::Cluster(
1723 "No cluster nodes available".to_string(),
1724 ));
1725 }
1726 }
1727 };
1728
1729 crate::streams::parse_xread_response(result)
1730 }
1731
1732 pub async fn xack(
1760 &self,
1761 stream: impl Into<String>,
1762 group: impl Into<String>,
1763 ids: Vec<String>,
1764 ) -> RedisResult<u64> {
1765 let stream = stream.into();
1766 let mut cmd_args = vec![
1767 RespValue::from(stream.clone()),
1768 RespValue::from(group.into()),
1769 ];
1770
1771 for id in ids {
1772 cmd_args.push(RespValue::from(id));
1773 }
1774
1775 let result = match self.topology_type {
1776 TopologyType::Standalone => {
1777 if let Some(pool) = &self.standalone_pool {
1778 pool.execute_command("XACK".to_string(), cmd_args).await?
1779 } else {
1780 return Err(RedisError::Connection(
1781 "No standalone pool available".to_string(),
1782 ));
1783 }
1784 }
1785 TopologyType::Cluster => {
1786 let slot = calculate_slot(stream.as_bytes());
1788 let node_key = if let Some(ref topology) = self.cluster_topology {
1790 if let Some((host, port)) = topology.get_node_for_slot(slot).await {
1791 Some(format!("{}:{}", host, port))
1792 } else {
1793 None
1794 }
1795 } else {
1796 None
1797 };
1798
1799 if let Some(node_key) = node_key {
1800 if let Some(pool) = self.get_cluster_pool(&node_key).await {
1801 pool.execute_command("XACK".to_string(), cmd_args).await?
1802 } else {
1803 return Err(RedisError::Cluster(format!(
1804 "Pool not found for node: {}",
1805 node_key
1806 )));
1807 }
1808 } else {
1809 return Err(RedisError::Cluster(format!(
1810 "No node found for slot: {}",
1811 slot
1812 )));
1813 }
1814 }
1815 };
1816
1817 Ok(result.as_int()? as u64)
1818 }
1819
1820 pub fn topology_type(&self) -> TopologyType {
1822 self.topology_type
1823 }
1824}
1825
1826struct ClientPipelineExecutor {
1828 client: Client,
1829}
1830
1831#[async_trait::async_trait]
1832impl PipelineExecutor for ClientPipelineExecutor {
1833 async fn execute_pipeline(
1834 &mut self,
1835 commands: Vec<Box<dyn PipelineCommand>>,
1836 ) -> RedisResult<Vec<RespValue>> {
1837 if commands.is_empty() {
1838 return Ok(Vec::new());
1839 }
1840
1841 let first_command = &commands[0];
1844 let first_key = first_command.key();
1845
1846 match self.client.topology_type {
1847 TopologyType::Standalone => {
1848 if let Some(pool) = &self.client.standalone_pool {
1850 self.execute_pipeline_on_pool(pool, commands).await
1851 } else {
1852 Err(RedisError::Connection(
1853 "No standalone pool available".to_string(),
1854 ))
1855 }
1856 }
1857 TopologyType::Cluster => {
1858 if let Some(key) = first_key {
1862 let slot = calculate_slot(key.as_bytes());
1863 let node_addr = self.get_node_for_slot(slot).await?;
1864 let pool = self.get_or_create_pool(&node_addr).await?;
1865 self.execute_pipeline_on_pool(&pool, commands).await
1866 } else {
1867 let pools = self.client.cluster_pools.read().await;
1869 if let Some((_, pool)) = pools.iter().next() {
1870 self.execute_pipeline_on_pool(pool, commands).await
1871 } else {
1872 Err(RedisError::Cluster(
1873 "No cluster nodes available".to_string(),
1874 ))
1875 }
1876 }
1877 }
1878 }
1879 }
1880}
1881
1882impl ClientPipelineExecutor {
1883 async fn execute_pipeline_on_pool(
1885 &self,
1886 pool: &Arc<Pool>,
1887 commands: Vec<Box<dyn PipelineCommand>>,
1888 ) -> RedisResult<Vec<RespValue>> {
1889 let mut pipeline_args = Vec::new();
1891
1892 for command in commands {
1893 let mut cmd_args = vec![RespValue::from(command.name())];
1894 cmd_args.extend(command.args());
1895 pipeline_args.push(RespValue::Array(cmd_args));
1896 }
1897
1898 let mut results = Vec::new();
1900 for cmd_array in pipeline_args {
1901 if let RespValue::Array(args) = cmd_array {
1902 if let Some(RespValue::BulkString(cmd_name)) = args.first() {
1903 let command = String::from_utf8_lossy(cmd_name).to_string();
1904 let cmd_args = args.into_iter().skip(1).collect();
1905
1906 let result = pool.execute_command(command, cmd_args).await?;
1909 results.push(result);
1910 } else if let Some(RespValue::SimpleString(cmd_name)) = args.first() {
1911 let command = cmd_name.clone();
1912 let cmd_args = args.into_iter().skip(1).collect();
1913
1914 let result = pool.execute_command(command, cmd_args).await?;
1915 results.push(result);
1916 }
1917 }
1918 }
1919
1920 Ok(results)
1921 }
1922
1923 async fn get_node_for_slot(&self, slot: u16) -> RedisResult<String> {
1925 if let Some(topology) = &self.client.cluster_topology {
1926 if let Some((host, port)) = topology.get_node_for_slot(slot).await {
1927 Ok(format!("{}:{}", host, port))
1928 } else {
1929 Err(RedisError::Cluster(format!(
1930 "No node found for slot {}",
1931 slot
1932 )))
1933 }
1934 } else {
1935 Err(RedisError::Cluster(
1936 "No cluster topology available".to_string(),
1937 ))
1938 }
1939 }
1940
1941 async fn get_or_create_pool(&self, node_addr: &str) -> RedisResult<Arc<Pool>> {
1943 let pools = self.client.cluster_pools.read().await;
1944 if let Some(pool) = pools.get(node_addr) {
1945 Ok(pool.clone())
1946 } else {
1947 drop(pools);
1948
1949 let mut pools = self.client.cluster_pools.write().await;
1951
1952 if let Some(pool) = pools.get(node_addr) {
1954 return Ok(pool.clone());
1955 }
1956
1957 let parts: Vec<&str> = node_addr.split(':').collect();
1959 if parts.len() != 2 {
1960 return Err(RedisError::Config(format!(
1961 "Invalid node address: {}",
1962 node_addr
1963 )));
1964 }
1965
1966 let host = parts[0];
1967 let port: u16 = parts[1].parse().map_err(|_| {
1968 RedisError::Config(format!("Invalid port in address: {}", node_addr))
1969 })?;
1970
1971 let node_config = self.client.config.clone();
1973
1974 let pool = Arc::new(Pool::new(node_config, host.to_string(), port).await?);
1975 pools.insert(node_addr.to_string(), pool.clone());
1976
1977 Ok(pool)
1978 }
1979 }
1980}
1981
1982struct ClientTransactionExecutor {
1984 client: Client,
1985}
1986
1987#[async_trait::async_trait]
1988impl TransactionExecutor for ClientTransactionExecutor {
1989 async fn multi(&mut self) -> RedisResult<()> {
1990 match self.client.topology_type {
1992 TopologyType::Standalone => {
1993 if let Some(pool) = &self.client.standalone_pool {
1994 let _result = pool.execute_command("MULTI".to_string(), vec![]).await?;
1995 Ok(())
1996 } else {
1997 Err(RedisError::Connection(
1998 "No standalone pool available".to_string(),
1999 ))
2000 }
2001 }
2002 TopologyType::Cluster => {
2003 let pools = self.client.cluster_pools.read().await;
2005 if let Some((_, pool)) = pools.iter().next() {
2006 let _result = pool.execute_command("MULTI".to_string(), vec![]).await?;
2007 Ok(())
2008 } else {
2009 Err(RedisError::Cluster(
2010 "No cluster nodes available".to_string(),
2011 ))
2012 }
2013 }
2014 }
2015 }
2016
2017 async fn queue_command(&mut self, command: Box<dyn TransactionCommand>) -> RedisResult<()> {
2018 let cmd_name = command.name().to_string();
2020 let cmd_args = command.args();
2021
2022 match self.client.topology_type {
2023 TopologyType::Standalone => {
2024 if let Some(pool) = &self.client.standalone_pool {
2025 let _result = pool.execute_command(cmd_name, cmd_args).await?;
2026 Ok(())
2027 } else {
2028 Err(RedisError::Connection(
2029 "No standalone pool available".to_string(),
2030 ))
2031 }
2032 }
2033 TopologyType::Cluster => {
2034 if let Some(key) = command.key() {
2036 let slot = calculate_slot(key.as_bytes());
2037 let node_addr = self.get_node_for_slot(slot).await?;
2038 let pool = self.get_or_create_pool(&node_addr).await?;
2039 let _result = pool.execute_command(cmd_name, cmd_args).await?;
2040 Ok(())
2041 } else {
2042 let pools = self.client.cluster_pools.read().await;
2044 if let Some((_, pool)) = pools.iter().next() {
2045 let _result = pool.execute_command(cmd_name, cmd_args).await?;
2046 Ok(())
2047 } else {
2048 Err(RedisError::Cluster(
2049 "No cluster nodes available".to_string(),
2050 ))
2051 }
2052 }
2053 }
2054 }
2055 }
2056
2057 async fn exec(&mut self) -> RedisResult<Vec<RespValue>> {
2058 match self.client.topology_type {
2060 TopologyType::Standalone => {
2061 if let Some(pool) = &self.client.standalone_pool {
2062 let result = pool.execute_command("EXEC".to_string(), vec![]).await?;
2063 match result {
2064 RespValue::Array(results) => Ok(results),
2065 RespValue::Null => Ok(vec![]), _ => Err(RedisError::Type(format!(
2067 "Unexpected EXEC response: {:?}",
2068 result
2069 ))),
2070 }
2071 } else {
2072 Err(RedisError::Connection(
2073 "No standalone pool available".to_string(),
2074 ))
2075 }
2076 }
2077 TopologyType::Cluster => {
2078 let pools = self.client.cluster_pools.read().await;
2079 if let Some((_, pool)) = pools.iter().next() {
2080 let result = pool.execute_command("EXEC".to_string(), vec![]).await?;
2081 match result {
2082 RespValue::Array(results) => Ok(results),
2083 RespValue::Null => Ok(vec![]), _ => Err(RedisError::Type(format!(
2085 "Unexpected EXEC response: {:?}",
2086 result
2087 ))),
2088 }
2089 } else {
2090 Err(RedisError::Cluster(
2091 "No cluster nodes available".to_string(),
2092 ))
2093 }
2094 }
2095 }
2096 }
2097
2098 async fn discard(&mut self) -> RedisResult<()> {
2099 match self.client.topology_type {
2101 TopologyType::Standalone => {
2102 if let Some(pool) = &self.client.standalone_pool {
2103 let _result = pool.execute_command("DISCARD".to_string(), vec![]).await?;
2104 Ok(())
2105 } else {
2106 Err(RedisError::Connection(
2107 "No standalone pool available".to_string(),
2108 ))
2109 }
2110 }
2111 TopologyType::Cluster => {
2112 let pools = self.client.cluster_pools.read().await;
2113 if let Some((_, pool)) = pools.iter().next() {
2114 let _result = pool.execute_command("DISCARD".to_string(), vec![]).await?;
2115 Ok(())
2116 } else {
2117 Err(RedisError::Cluster(
2118 "No cluster nodes available".to_string(),
2119 ))
2120 }
2121 }
2122 }
2123 }
2124
2125 async fn watch(&mut self, keys: Vec<String>) -> RedisResult<()> {
2126 let mut args = vec![];
2128 for key in keys {
2129 args.push(RespValue::from(key));
2130 }
2131
2132 match self.client.topology_type {
2133 TopologyType::Standalone => {
2134 if let Some(pool) = &self.client.standalone_pool {
2135 let _result = pool.execute_command("WATCH".to_string(), args).await?;
2136 Ok(())
2137 } else {
2138 Err(RedisError::Connection(
2139 "No standalone pool available".to_string(),
2140 ))
2141 }
2142 }
2143 TopologyType::Cluster => {
2144 let pools = self.client.cluster_pools.read().await;
2145 if let Some((_, pool)) = pools.iter().next() {
2146 let _result = pool.execute_command("WATCH".to_string(), args).await?;
2147 Ok(())
2148 } else {
2149 Err(RedisError::Cluster(
2150 "No cluster nodes available".to_string(),
2151 ))
2152 }
2153 }
2154 }
2155 }
2156
2157 async fn unwatch(&mut self) -> RedisResult<()> {
2158 match self.client.topology_type {
2160 TopologyType::Standalone => {
2161 if let Some(pool) = &self.client.standalone_pool {
2162 let _result = pool.execute_command("UNWATCH".to_string(), vec![]).await?;
2163 Ok(())
2164 } else {
2165 Err(RedisError::Connection(
2166 "No standalone pool available".to_string(),
2167 ))
2168 }
2169 }
2170 TopologyType::Cluster => {
2171 let pools = self.client.cluster_pools.read().await;
2172 if let Some((_, pool)) = pools.iter().next() {
2173 let _result = pool.execute_command("UNWATCH".to_string(), vec![]).await?;
2174 Ok(())
2175 } else {
2176 Err(RedisError::Cluster(
2177 "No cluster nodes available".to_string(),
2178 ))
2179 }
2180 }
2181 }
2182 }
2183}
2184
2185impl ClientTransactionExecutor {
2186 async fn get_node_for_slot(&self, slot: u16) -> RedisResult<String> {
2188 if let Some(topology) = &self.client.cluster_topology {
2189 if let Some((host, port)) = topology.get_node_for_slot(slot).await {
2190 Ok(format!("{}:{}", host, port))
2191 } else {
2192 Err(RedisError::Cluster(format!(
2193 "No node found for slot {}",
2194 slot
2195 )))
2196 }
2197 } else {
2198 Err(RedisError::Cluster(
2199 "No cluster topology available".to_string(),
2200 ))
2201 }
2202 }
2203
2204 async fn get_or_create_pool(&self, node_addr: &str) -> RedisResult<Arc<Pool>> {
2206 let pools = self.client.cluster_pools.read().await;
2207 if let Some(pool) = pools.get(node_addr) {
2208 Ok(pool.clone())
2209 } else {
2210 drop(pools);
2211
2212 let mut pools = self.client.cluster_pools.write().await;
2214
2215 if let Some(pool) = pools.get(node_addr) {
2217 return Ok(pool.clone());
2218 }
2219
2220 let parts: Vec<&str> = node_addr.split(':').collect();
2222 if parts.len() != 2 {
2223 return Err(RedisError::Config(format!(
2224 "Invalid node address: {}",
2225 node_addr
2226 )));
2227 }
2228
2229 let host = parts[0];
2230 let port: u16 = parts[1].parse().map_err(|_| {
2231 RedisError::Config(format!("Invalid port in address: {}", node_addr))
2232 })?;
2233
2234 let node_config = self.client.config.clone();
2236
2237 let pool = Arc::new(Pool::new(node_config, host.to_string(), port).await?);
2238 pools.insert(node_addr.to_string(), pool.clone());
2239
2240 Ok(pool)
2241 }
2242 }
2243}
2244
2245struct ClientPubSubConnection {
2247 client: Client,
2248}
2249
2250#[async_trait::async_trait]
2251impl PubSubConnection for ClientPubSubConnection {
2252 async fn subscribe(&mut self, channels: Vec<String>) -> RedisResult<()> {
2253 let mut args = vec![];
2254 for channel in channels {
2255 args.push(RespValue::from(channel));
2256 }
2257
2258 match self.client.topology_type {
2259 TopologyType::Standalone => {
2260 if let Some(pool) = &self.client.standalone_pool {
2261 let _result = pool.execute_command("SUBSCRIBE".to_string(), args).await?;
2262 Ok(())
2263 } else {
2264 Err(RedisError::Connection(
2265 "No standalone pool available".to_string(),
2266 ))
2267 }
2268 }
2269 TopologyType::Cluster => {
2270 let pools = self.client.cluster_pools.read().await;
2271 if let Some((_, pool)) = pools.iter().next() {
2272 let _result = pool.execute_command("SUBSCRIBE".to_string(), args).await?;
2273 Ok(())
2274 } else {
2275 Err(RedisError::Cluster(
2276 "No cluster nodes available".to_string(),
2277 ))
2278 }
2279 }
2280 }
2281 }
2282
2283 async fn unsubscribe(&mut self, channels: Vec<String>) -> RedisResult<()> {
2284 let mut args = vec![];
2285 for channel in channels {
2286 args.push(RespValue::from(channel));
2287 }
2288
2289 match self.client.topology_type {
2290 TopologyType::Standalone => {
2291 if let Some(pool) = &self.client.standalone_pool {
2292 let _result = pool
2293 .execute_command("UNSUBSCRIBE".to_string(), args)
2294 .await?;
2295 Ok(())
2296 } else {
2297 Err(RedisError::Connection(
2298 "No standalone pool available".to_string(),
2299 ))
2300 }
2301 }
2302 TopologyType::Cluster => {
2303 let pools = self.client.cluster_pools.read().await;
2304 if let Some((_, pool)) = pools.iter().next() {
2305 let _result = pool
2306 .execute_command("UNSUBSCRIBE".to_string(), args)
2307 .await?;
2308 Ok(())
2309 } else {
2310 Err(RedisError::Cluster(
2311 "No cluster nodes available".to_string(),
2312 ))
2313 }
2314 }
2315 }
2316 }
2317
2318 async fn psubscribe(&mut self, patterns: Vec<String>) -> RedisResult<()> {
2319 let mut args = vec![];
2320 for pattern in patterns {
2321 args.push(RespValue::from(pattern));
2322 }
2323
2324 match self.client.topology_type {
2325 TopologyType::Standalone => {
2326 if let Some(pool) = &self.client.standalone_pool {
2327 let _result = pool.execute_command("PSUBSCRIBE".to_string(), args).await?;
2328 Ok(())
2329 } else {
2330 Err(RedisError::Connection(
2331 "No standalone pool available".to_string(),
2332 ))
2333 }
2334 }
2335 TopologyType::Cluster => {
2336 let pools = self.client.cluster_pools.read().await;
2337 if let Some((_, pool)) = pools.iter().next() {
2338 let _result = pool.execute_command("PSUBSCRIBE".to_string(), args).await?;
2339 Ok(())
2340 } else {
2341 Err(RedisError::Cluster(
2342 "No cluster nodes available".to_string(),
2343 ))
2344 }
2345 }
2346 }
2347 }
2348
2349 async fn punsubscribe(&mut self, patterns: Vec<String>) -> RedisResult<()> {
2350 let mut args = vec![];
2351 for pattern in patterns {
2352 args.push(RespValue::from(pattern));
2353 }
2354
2355 match self.client.topology_type {
2356 TopologyType::Standalone => {
2357 if let Some(pool) = &self.client.standalone_pool {
2358 let _result = pool
2359 .execute_command("PUNSUBSCRIBE".to_string(), args)
2360 .await?;
2361 Ok(())
2362 } else {
2363 Err(RedisError::Connection(
2364 "No standalone pool available".to_string(),
2365 ))
2366 }
2367 }
2368 TopologyType::Cluster => {
2369 let pools = self.client.cluster_pools.read().await;
2370 if let Some((_, pool)) = pools.iter().next() {
2371 let _result = pool
2372 .execute_command("PUNSUBSCRIBE".to_string(), args)
2373 .await?;
2374 Ok(())
2375 } else {
2376 Err(RedisError::Cluster(
2377 "No cluster nodes available".to_string(),
2378 ))
2379 }
2380 }
2381 }
2382 }
2383
2384 async fn listen(
2385 &mut self,
2386 message_tx: tokio::sync::mpsc::UnboundedSender<crate::pubsub::PubSubMessage>,
2387 ) -> RedisResult<()> {
2388 drop(message_tx); Ok(())
2404 }
2405
2406 async fn publish(&mut self, channel: String, message: String) -> RedisResult<i64> {
2407 let args = vec![RespValue::from(channel), RespValue::from(message)];
2408
2409 match self.client.topology_type {
2410 TopologyType::Standalone => {
2411 if let Some(pool) = &self.client.standalone_pool {
2412 let result = pool.execute_command("PUBLISH".to_string(), args).await?;
2413 result.as_int()
2414 } else {
2415 Err(RedisError::Connection(
2416 "No standalone pool available".to_string(),
2417 ))
2418 }
2419 }
2420 TopologyType::Cluster => {
2421 let pools = self.client.cluster_pools.read().await;
2422 if let Some((_, pool)) = pools.iter().next() {
2423 let result = pool.execute_command("PUBLISH".to_string(), args).await?;
2424 result.as_int()
2425 } else {
2426 Err(RedisError::Cluster(
2427 "No cluster nodes available".to_string(),
2428 ))
2429 }
2430 }
2431 }
2432 }
2433}
2434
2435#[cfg(test)]
2436mod tests {
2437 use super::*;
2438
2439 #[test]
2440 fn test_client_configuration() {
2441 let config = ConnectionConfig::new("redis://localhost:6379");
2442 assert!(!config.connection_string.is_empty());
2443 }
2444}