1use crate::redis::{RedisPipeline, RedisTransaction};
2use crate::{DbError, RedisConfig, RedisValue, Result};
3use deadpool_redis::{Config, Pool, PoolConfig, Runtime, Timeouts};
4use std::time::Duration;
5
6#[derive(Debug, Clone)]
8pub struct PoolStatus {
9 pub max_size: usize,
11 pub size: usize,
13 pub available: usize,
15 pub waiting: usize,
17}
18
19#[derive(Clone)]
23pub struct RedisClient {
24 pool: Pool,
26 #[allow(dead_code)]
28 config: RedisConfig,
29}
30
31impl RedisClient {
32 pub async fn connect(url: impl Into<String>) -> Result<Self> {
52 let config = RedisConfig::default();
53 Self::connect_with_config(url, config).await
54 }
55
56 pub async fn connect_with_config(url: impl Into<String>, config: RedisConfig) -> Result<Self> {
78 let url_str = url.into();
79
80 let mut cfg = Config::from_url(url_str.clone());
82 cfg.pool = Some(PoolConfig {
83 max_size: config.max_connections,
84 timeouts: Timeouts {
85 wait: Some(Duration::from_secs(config.wait_timeout)),
86 create: Some(Duration::from_secs(config.connect_timeout)),
87 recycle: Some(Duration::from_secs(config.connect_timeout)),
88 },
89 ..Default::default()
90 });
91
92 let pool = cfg
94 .create_pool(Some(Runtime::Tokio1))
95 .map_err(|e| DbError::RedisConnectionError(format!("创建连接池失败: {}", e)))?;
96
97 let mut conn = pool
99 .get()
100 .await
101 .map_err(|e| DbError::RedisConnectionError(format!("获取连接失败: {}", e)))?;
102
103 redis::cmd("PING")
105 .query_async::<String>(&mut *conn)
106 .await
107 .map_err(|e| DbError::RedisConnectionError(format!("连接测试失败: {}", e)))?;
108
109 if config.enable_logging {
110 log::info!("Redis 连接成功: {}", url_str);
111 }
112
113 Ok(Self { pool, config })
114 }
115
116 pub fn pool(&self) -> &Pool {
121 &self.pool
122 }
123
124 pub fn pipeline(&self) -> RedisPipeline {
152 RedisPipeline::new(self.clone())
153 }
154
155 pub fn transaction(&self) -> RedisTransaction {
211 RedisTransaction::new(self.clone())
212 }
213
214 pub async fn execute(&self, cmd: &redis::Cmd) -> Result<RedisValue> {
240 let mut conn = self
241 .pool
242 .get()
243 .await
244 .map_err(|e| DbError::RedisPoolError(format!("获取连接失败: {}", e)))?;
245
246 let value: redis::Value = cmd
247 .query_async(&mut *conn)
248 .await
249 .map_err(|e| DbError::RedisCommandError(format!("命令执行失败: {}", e)))?;
250
251 Ok(RedisValue::from(value))
252 }
253
254 pub async fn set(&self, key: impl Into<String>, value: impl Into<String>) -> Result<()> {
266 let mut cmd = redis::cmd("SET");
267 cmd.arg(key.into()).arg(value.into());
268 self.execute(&cmd).await?;
269 Ok(())
270 }
271
272 pub async fn get(&self, key: impl Into<String>) -> Result<Option<String>> {
282 let mut cmd = redis::cmd("GET");
283 cmd.arg(key.into());
284 let value = self.execute(&cmd).await?;
285 Ok(value.as_string())
286 }
287
288 pub async fn setex(
295 &self,
296 key: impl Into<String>,
297 seconds: i64,
298 value: impl Into<String>,
299 ) -> Result<()> {
300 let mut cmd = redis::cmd("SETEX");
301 cmd.arg(key.into()).arg(seconds).arg(value.into());
302 self.execute(&cmd).await?;
303 Ok(())
304 }
305
306 pub async fn setnx(&self, key: impl Into<String>, value: impl Into<String>) -> Result<bool> {
312 let mut cmd = redis::cmd("SETNX");
313 cmd.arg(key.into()).arg(value.into());
314 let result = self.execute(&cmd).await?;
315 Ok(result.as_i64() == Some(1))
316 }
317
318 pub async fn getset(
324 &self,
325 key: impl Into<String>,
326 value: impl Into<String>,
327 ) -> Result<Option<String>> {
328 let mut cmd = redis::cmd("GETSET");
329 cmd.arg(key.into()).arg(value.into());
330 let result = self.execute(&cmd).await?;
331 Ok(result.as_string())
332 }
333
334 pub async fn mget(&self, keys: &[String]) -> Result<Vec<Option<String>>> {
339 let mut cmd = redis::cmd("MGET");
340 for key in keys {
341 cmd.arg(key);
342 }
343 let result = self.execute(&cmd).await?;
344 if let Some(arr) = result.as_array() {
345 Ok(arr.iter().map(|v| v.as_string()).collect())
346 } else {
347 Ok(vec![])
348 }
349 }
350
351 pub async fn mset(&self, pairs: &[(String, String)]) -> Result<()> {
356 let mut cmd = redis::cmd("MSET");
357 for (key, value) in pairs {
358 cmd.arg(key).arg(value);
359 }
360 self.execute(&cmd).await?;
361 Ok(())
362 }
363
364 pub async fn incr(&self, key: impl Into<String>) -> Result<i64> {
369 let mut cmd = redis::cmd("INCR");
370 cmd.arg(key.into());
371 let result = self.execute(&cmd).await?;
372 result
373 .as_i64()
374 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
375 }
376
377 pub async fn incrby(&self, key: impl Into<String>, increment: i64) -> Result<i64> {
382 let mut cmd = redis::cmd("INCRBY");
383 cmd.arg(key.into()).arg(increment);
384 let result = self.execute(&cmd).await?;
385 result
386 .as_i64()
387 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
388 }
389
390 pub async fn decr(&self, key: impl Into<String>) -> Result<i64> {
395 let mut cmd = redis::cmd("DECR");
396 cmd.arg(key.into());
397 let result = self.execute(&cmd).await?;
398 result
399 .as_i64()
400 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
401 }
402
403 pub async fn decrby(&self, key: impl Into<String>, decrement: i64) -> Result<i64> {
408 let mut cmd = redis::cmd("DECRBY");
409 cmd.arg(key.into()).arg(decrement);
410 let result = self.execute(&cmd).await?;
411 result
412 .as_i64()
413 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
414 }
415
416 pub async fn append(&self, key: impl Into<String>, value: impl Into<String>) -> Result<i64> {
421 let mut cmd = redis::cmd("APPEND");
422 cmd.arg(key.into()).arg(value.into());
423 let result = self.execute(&cmd).await?;
424 result
425 .as_i64()
426 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
427 }
428
429 pub async fn strlen(&self, key: impl Into<String>) -> Result<i64> {
434 let mut cmd = redis::cmd("STRLEN");
435 cmd.arg(key.into());
436 let result = self.execute(&cmd).await?;
437 result
438 .as_i64()
439 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
440 }
441
442 pub async fn getrange(&self, key: impl Into<String>, start: i64, end: i64) -> Result<String> {
469 let mut cmd = redis::cmd("GETRANGE");
470 cmd.arg(key.into()).arg(start).arg(end);
471 let result = self.execute(&cmd).await?;
472 Ok(result.as_string().unwrap_or_default())
473 }
474
475 pub async fn setrange(
500 &self,
501 key: impl Into<String>,
502 offset: i64,
503 value: impl Into<String>,
504 ) -> Result<i64> {
505 let mut cmd = redis::cmd("SETRANGE");
506 cmd.arg(key.into()).arg(offset).arg(value.into());
507 let result = self.execute(&cmd).await?;
508 result
509 .as_i64()
510 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
511 }
512
513 pub async fn incrbyfloat(&self, key: impl Into<String>, increment: f64) -> Result<f64> {
537 let mut cmd = redis::cmd("INCRBYFLOAT");
538 cmd.arg(key.into()).arg(increment);
539 let result = self.execute(&cmd).await?;
540 if let Some(s) = result.as_string() {
542 s.parse::<f64>()
543 .map_err(|_| DbError::RedisTypeConversionError("无法转换为浮点数".to_string()))
544 } else {
545 result
546 .as_f64()
547 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为浮点数".to_string()))
548 }
549 }
550
551 pub async fn psetex(
573 &self,
574 key: impl Into<String>,
575 milliseconds: i64,
576 value: impl Into<String>,
577 ) -> Result<()> {
578 let mut cmd = redis::cmd("PSETEX");
579 cmd.arg(key.into()).arg(milliseconds).arg(value.into());
580 self.execute(&cmd).await?;
581 Ok(())
582 }
583
584 pub async fn hset(
592 &self,
593 key: impl Into<String>,
594 field: impl Into<String>,
595 value: impl Into<String>,
596 ) -> Result<bool> {
597 let mut cmd = redis::cmd("HSET");
598 cmd.arg(key.into()).arg(field.into()).arg(value.into());
599 let result = self.execute(&cmd).await?;
600 Ok(result.as_i64() == Some(1))
601 }
602
603 pub async fn hget(
609 &self,
610 key: impl Into<String>,
611 field: impl Into<String>,
612 ) -> Result<Option<String>> {
613 let mut cmd = redis::cmd("HGET");
614 cmd.arg(key.into()).arg(field.into());
615 let result = self.execute(&cmd).await?;
616 Ok(result.as_string())
617 }
618
619 pub async fn hdel(&self, key: impl Into<String>, fields: &[String]) -> Result<i64> {
624 let mut cmd = redis::cmd("HDEL");
625 cmd.arg(key.into());
626 for field in fields {
627 cmd.arg(field);
628 }
629 let result = self.execute(&cmd).await?;
630 result
631 .as_i64()
632 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
633 }
634
635 pub async fn hexists(&self, key: impl Into<String>, field: impl Into<String>) -> Result<bool> {
641 let mut cmd = redis::cmd("HEXISTS");
642 cmd.arg(key.into()).arg(field.into());
643 let result = self.execute(&cmd).await?;
644 Ok(result.as_i64() == Some(1))
645 }
646
647 pub async fn hmset(&self, key: impl Into<String>, fields: &[(String, String)]) -> Result<()> {
649 let mut cmd = redis::cmd("HMSET");
650 cmd.arg(key.into());
651 for (field, value) in fields {
652 cmd.arg(field).arg(value);
653 }
654 self.execute(&cmd).await?;
655 Ok(())
656 }
657
658 pub async fn hmget(
663 &self,
664 key: impl Into<String>,
665 fields: &[String],
666 ) -> Result<Vec<Option<String>>> {
667 let mut cmd = redis::cmd("HMGET");
668 cmd.arg(key.into());
669 for field in fields {
670 cmd.arg(field);
671 }
672 let result = self.execute(&cmd).await?;
673 if let Some(arr) = result.as_array() {
674 Ok(arr.iter().map(|v| v.as_string()).collect())
675 } else {
676 Ok(vec![])
677 }
678 }
679
680 pub async fn hgetall(&self, key: impl Into<String>) -> Result<Vec<(String, String)>> {
685 let mut cmd = redis::cmd("HGETALL");
686 cmd.arg(key.into());
687 let result = self.execute(&cmd).await?;
688 if let Some(arr) = result.as_array() {
689 let mut pairs = Vec::new();
690 for i in (0..arr.len()).step_by(2) {
691 if i + 1 < arr.len() {
692 if let (Some(field), Some(value)) = (arr[i].as_string(), arr[i + 1].as_string()) {
693 pairs.push((field, value));
694 }
695 }
696 }
697 Ok(pairs)
698 } else {
699 Ok(vec![])
700 }
701 }
702
703 pub async fn hlen(&self, key: impl Into<String>) -> Result<i64> {
705 let mut cmd = redis::cmd("HLEN");
706 cmd.arg(key.into());
707 let result = self.execute(&cmd).await?;
708 result
709 .as_i64()
710 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
711 }
712
713 pub async fn hkeys(&self, key: impl Into<String>) -> Result<Vec<String>> {
715 let mut cmd = redis::cmd("HKEYS");
716 cmd.arg(key.into());
717 let result = self.execute(&cmd).await?;
718 if let Some(arr) = result.as_array() {
719 Ok(arr.iter().filter_map(|v| v.as_string()).collect())
720 } else {
721 Ok(vec![])
722 }
723 }
724
725 pub async fn hvals(&self, key: impl Into<String>) -> Result<Vec<String>> {
727 let mut cmd = redis::cmd("HVALS");
728 cmd.arg(key.into());
729 let result = self.execute(&cmd).await?;
730 if let Some(arr) = result.as_array() {
731 Ok(arr.iter().filter_map(|v| v.as_string()).collect())
732 } else {
733 Ok(vec![])
734 }
735 }
736
737 pub async fn hincrby(
739 &self,
740 key: impl Into<String>,
741 field: impl Into<String>,
742 increment: i64,
743 ) -> Result<i64> {
744 let mut cmd = redis::cmd("HINCRBY");
745 cmd.arg(key.into()).arg(field.into()).arg(increment);
746 let result = self.execute(&cmd).await?;
747 result
748 .as_i64()
749 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
750 }
751
752 pub async fn hincrbyfloat(
754 &self,
755 key: impl Into<String>,
756 field: impl Into<String>,
757 increment: f64,
758 ) -> Result<f64> {
759 let mut cmd = redis::cmd("HINCRBYFLOAT");
760 cmd.arg(key.into()).arg(field.into()).arg(increment);
761 let result = self.execute(&cmd).await?;
762 if let Some(s) = result.as_string() {
764 s.parse::<f64>()
765 .map_err(|_| DbError::RedisTypeConversionError("无法转换为浮点数".to_string()))
766 } else {
767 result
768 .as_f64()
769 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为浮点数".to_string()))
770 }
771 }
772
773 pub async fn lpush(&self, key: impl Into<String>, values: &[String]) -> Result<i64> {
780 let mut cmd = redis::cmd("LPUSH");
781 cmd.arg(key.into());
782 for value in values {
783 cmd.arg(value);
784 }
785 let result = self.execute(&cmd).await?;
786 result
787 .as_i64()
788 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
789 }
790
791 pub async fn rpush(&self, key: impl Into<String>, values: &[String]) -> Result<i64> {
796 let mut cmd = redis::cmd("RPUSH");
797 cmd.arg(key.into());
798 for value in values {
799 cmd.arg(value);
800 }
801 let result = self.execute(&cmd).await?;
802 result
803 .as_i64()
804 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
805 }
806
807 pub async fn lpop(&self, key: impl Into<String>) -> Result<Option<String>> {
813 let mut cmd = redis::cmd("LPOP");
814 cmd.arg(key.into());
815 let result = self.execute(&cmd).await?;
816 Ok(result.as_string())
817 }
818
819 pub async fn rpop(&self, key: impl Into<String>) -> Result<Option<String>> {
825 let mut cmd = redis::cmd("RPOP");
826 cmd.arg(key.into());
827 let result = self.execute(&cmd).await?;
828 Ok(result.as_string())
829 }
830
831 pub async fn lrange(
837 &self,
838 key: impl Into<String>,
839 start: i64,
840 stop: i64,
841 ) -> Result<Vec<String>> {
842 let mut cmd = redis::cmd("LRANGE");
843 cmd.arg(key.into()).arg(start).arg(stop);
844 let result = self.execute(&cmd).await?;
845 if let Some(arr) = result.as_array() {
846 Ok(arr.iter().filter_map(|v| v.as_string()).collect())
847 } else {
848 Ok(vec![])
849 }
850 }
851
852 pub async fn llen(&self, key: impl Into<String>) -> Result<i64> {
854 let mut cmd = redis::cmd("LLEN");
855 cmd.arg(key.into());
856 let result = self.execute(&cmd).await?;
857 result
858 .as_i64()
859 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
860 }
861
862 pub async fn lindex(&self, key: impl Into<String>, index: i64) -> Result<Option<String>> {
868 let mut cmd = redis::cmd("LINDEX");
869 cmd.arg(key.into()).arg(index);
870 let result = self.execute(&cmd).await?;
871 Ok(result.as_string())
872 }
873
874 pub async fn lset(
876 &self,
877 key: impl Into<String>,
878 index: i64,
879 value: impl Into<String>,
880 ) -> Result<()> {
881 let mut cmd = redis::cmd("LSET");
882 cmd.arg(key.into()).arg(index).arg(value.into());
883 self.execute(&cmd).await?;
884 Ok(())
885 }
886
887 pub async fn ltrim(&self, key: impl Into<String>, start: i64, stop: i64) -> Result<()> {
889 let mut cmd = redis::cmd("LTRIM");
890 cmd.arg(key.into()).arg(start).arg(stop);
891 self.execute(&cmd).await?;
892 Ok(())
893 }
894
895 pub async fn linsert(
921 &self,
922 key: impl Into<String>,
923 before_after: &str,
924 pivot: impl Into<String>,
925 value: impl Into<String>,
926 ) -> Result<i64> {
927 let mut cmd = redis::cmd("LINSERT");
928 cmd.arg(key.into())
929 .arg(before_after)
930 .arg(pivot.into())
931 .arg(value.into());
932 let result = self.execute(&cmd).await?;
933 result
934 .as_i64()
935 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
936 }
937
938 pub async fn lrem(
966 &self,
967 key: impl Into<String>,
968 count: i64,
969 value: impl Into<String>,
970 ) -> Result<i64> {
971 let mut cmd = redis::cmd("LREM");
972 cmd.arg(key.into()).arg(count).arg(value.into());
973 let result = self.execute(&cmd).await?;
974 result
975 .as_i64()
976 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
977 }
978
979 pub async fn rpoplpush(
1004 &self,
1005 source: impl Into<String>,
1006 destination: impl Into<String>,
1007 ) -> Result<Option<String>> {
1008 let mut cmd = redis::cmd("RPOPLPUSH");
1009 cmd.arg(source.into()).arg(destination.into());
1010 let result = self.execute(&cmd).await?;
1011 Ok(result.as_string())
1012 }
1013
1014 pub async fn blpop(&self, keys: &[String], timeout: i64) -> Result<Option<(String, String)>> {
1041 let mut cmd = redis::cmd("BLPOP");
1042 for key in keys {
1043 cmd.arg(key);
1044 }
1045 cmd.arg(timeout);
1046 let result = self.execute(&cmd).await?;
1047
1048 if let Some(arr) = result.as_array() {
1050 if arr.len() == 2 {
1051 if let (Some(key), Some(value)) = (arr[0].as_string(), arr[1].as_string()) {
1052 return Ok(Some((key, value)));
1053 }
1054 }
1055 }
1056 Ok(None)
1057 }
1058
1059 pub async fn brpop(&self, keys: &[String], timeout: i64) -> Result<Option<(String, String)>> {
1083 let mut cmd = redis::cmd("BRPOP");
1084 for key in keys {
1085 cmd.arg(key);
1086 }
1087 cmd.arg(timeout);
1088 let result = self.execute(&cmd).await?;
1089
1090 if let Some(arr) = result.as_array() {
1092 if arr.len() == 2 {
1093 if let (Some(key), Some(value)) = (arr[0].as_string(), arr[1].as_string()) {
1094 return Ok(Some((key, value)));
1095 }
1096 }
1097 }
1098 Ok(None)
1099 }
1100
1101 pub async fn sadd(&self, key: impl Into<String>, members: &[String]) -> Result<i64> {
1108 let mut cmd = redis::cmd("SADD");
1109 cmd.arg(key.into());
1110 for member in members {
1111 cmd.arg(member);
1112 }
1113 let result = self.execute(&cmd).await?;
1114 result
1115 .as_i64()
1116 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
1117 }
1118
1119 pub async fn srem(&self, key: impl Into<String>, members: &[String]) -> Result<i64> {
1124 let mut cmd = redis::cmd("SREM");
1125 cmd.arg(key.into());
1126 for member in members {
1127 cmd.arg(member);
1128 }
1129 let result = self.execute(&cmd).await?;
1130 result
1131 .as_i64()
1132 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
1133 }
1134
1135 pub async fn smembers(&self, key: impl Into<String>) -> Result<Vec<String>> {
1137 let mut cmd = redis::cmd("SMEMBERS");
1138 cmd.arg(key.into());
1139 let result = self.execute(&cmd).await?;
1140 if let Some(arr) = result.as_array() {
1141 Ok(arr.iter().filter_map(|v| v.as_string()).collect())
1142 } else {
1143 Ok(vec![])
1144 }
1145 }
1146
1147 pub async fn sismember(
1153 &self,
1154 key: impl Into<String>,
1155 member: impl Into<String>,
1156 ) -> Result<bool> {
1157 let mut cmd = redis::cmd("SISMEMBER");
1158 cmd.arg(key.into()).arg(member.into());
1159 let result = self.execute(&cmd).await?;
1160 Ok(result.as_i64() == Some(1))
1161 }
1162
1163 pub async fn scard(&self, key: impl Into<String>) -> Result<i64> {
1165 let mut cmd = redis::cmd("SCARD");
1166 cmd.arg(key.into());
1167 let result = self.execute(&cmd).await?;
1168 result
1169 .as_i64()
1170 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
1171 }
1172
1173 pub async fn spop(&self, key: impl Into<String>) -> Result<Option<String>> {
1179 let mut cmd = redis::cmd("SPOP");
1180 cmd.arg(key.into());
1181 let result = self.execute(&cmd).await?;
1182 Ok(result.as_string())
1183 }
1184
1185 pub async fn srandmember(&self, key: impl Into<String>) -> Result<Option<String>> {
1191 let mut cmd = redis::cmd("SRANDMEMBER");
1192 cmd.arg(key.into());
1193 let result = self.execute(&cmd).await?;
1194 Ok(result.as_string())
1195 }
1196
1197 pub async fn sinter(&self, keys: &[String]) -> Result<Vec<String>> {
1217 let mut cmd = redis::cmd("SINTER");
1218 for key in keys {
1219 cmd.arg(key);
1220 }
1221 let result = self.execute(&cmd).await?;
1222 if let Some(arr) = result.as_array() {
1223 Ok(arr.iter().filter_map(|v| v.as_string()).collect())
1224 } else {
1225 Ok(vec![])
1226 }
1227 }
1228
1229 pub async fn sunion(&self, keys: &[String]) -> Result<Vec<String>> {
1239 let mut cmd = redis::cmd("SUNION");
1240 for key in keys {
1241 cmd.arg(key);
1242 }
1243 let result = self.execute(&cmd).await?;
1244 if let Some(arr) = result.as_array() {
1245 Ok(arr.iter().filter_map(|v| v.as_string()).collect())
1246 } else {
1247 Ok(vec![])
1248 }
1249 }
1250
1251 pub async fn sdiff(&self, keys: &[String]) -> Result<Vec<String>> {
1261 let mut cmd = redis::cmd("SDIFF");
1262 for key in keys {
1263 cmd.arg(key);
1264 }
1265 let result = self.execute(&cmd).await?;
1266 if let Some(arr) = result.as_array() {
1267 Ok(arr.iter().filter_map(|v| v.as_string()).collect())
1268 } else {
1269 Ok(vec![])
1270 }
1271 }
1272
1273 pub async fn smove(
1284 &self,
1285 source: impl Into<String>,
1286 destination: impl Into<String>,
1287 member: impl Into<String>,
1288 ) -> Result<bool> {
1289 let mut cmd = redis::cmd("SMOVE");
1290 cmd.arg(source.into())
1291 .arg(destination.into())
1292 .arg(member.into());
1293 let result = self.execute(&cmd).await?;
1294 Ok(result.as_i64() == Some(1))
1295 }
1296
1297 pub async fn sscan(
1308 &self,
1309 key: impl Into<String>,
1310 cursor: i64,
1311 pattern: Option<&str>,
1312 count: Option<i64>,
1313 ) -> Result<(i64, Vec<String>)> {
1314 let mut cmd = redis::cmd("SSCAN");
1315 cmd.arg(key.into()).arg(cursor);
1316 if let Some(p) = pattern {
1317 cmd.arg("MATCH").arg(p);
1318 }
1319 if let Some(c) = count {
1320 cmd.arg("COUNT").arg(c);
1321 }
1322 let result = self.execute(&cmd).await?;
1323 parse_scan_result(&result)
1324 }
1325
1326 pub async fn zadd(&self, key: impl Into<String>, members: &[(f64, String)]) -> Result<i64> {
1333 let mut cmd = redis::cmd("ZADD");
1334 cmd.arg(key.into());
1335 for (score, member) in members {
1336 cmd.arg(score).arg(member);
1337 }
1338 let result = self.execute(&cmd).await?;
1339 result
1340 .as_i64()
1341 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
1342 }
1343
1344 pub async fn zrem(&self, key: impl Into<String>, members: &[String]) -> Result<i64> {
1349 let mut cmd = redis::cmd("ZREM");
1350 cmd.arg(key.into());
1351 for member in members {
1352 cmd.arg(member);
1353 }
1354 let result = self.execute(&cmd).await?;
1355 result
1356 .as_i64()
1357 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
1358 }
1359
1360 pub async fn zscore(
1366 &self,
1367 key: impl Into<String>,
1368 member: impl Into<String>,
1369 ) -> Result<Option<f64>> {
1370 let mut cmd = redis::cmd("ZSCORE");
1371 cmd.arg(key.into()).arg(member.into());
1372 let result = self.execute(&cmd).await?;
1373 if let Some(s) = result.as_string() {
1374 Ok(s.parse::<f64>().ok())
1375 } else {
1376 Ok(result.as_f64())
1377 }
1378 }
1379
1380 pub async fn zcard(&self, key: impl Into<String>) -> Result<i64> {
1382 let mut cmd = redis::cmd("ZCARD");
1383 cmd.arg(key.into());
1384 let result = self.execute(&cmd).await?;
1385 result
1386 .as_i64()
1387 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
1388 }
1389
1390 pub async fn zrange(
1396 &self,
1397 key: impl Into<String>,
1398 start: i64,
1399 stop: i64,
1400 ) -> Result<Vec<String>> {
1401 let mut cmd = redis::cmd("ZRANGE");
1402 cmd.arg(key.into()).arg(start).arg(stop);
1403 let result = self.execute(&cmd).await?;
1404 if let Some(arr) = result.as_array() {
1405 Ok(arr.iter().filter_map(|v| v.as_string()).collect())
1406 } else {
1407 Ok(vec![])
1408 }
1409 }
1410
1411 pub async fn zrangebyscore(
1417 &self,
1418 key: impl Into<String>,
1419 min: f64,
1420 max: f64,
1421 ) -> Result<Vec<String>> {
1422 let mut cmd = redis::cmd("ZRANGEBYSCORE");
1423 cmd.arg(key.into()).arg(min).arg(max);
1424 let result = self.execute(&cmd).await?;
1425 if let Some(arr) = result.as_array() {
1426 Ok(arr.iter().filter_map(|v| v.as_string()).collect())
1427 } else {
1428 Ok(vec![])
1429 }
1430 }
1431
1432 pub async fn zcount(&self, key: impl Into<String>, min: f64, max: f64) -> Result<i64> {
1434 let mut cmd = redis::cmd("ZCOUNT");
1435 cmd.arg(key.into()).arg(min).arg(max);
1436 let result = self.execute(&cmd).await?;
1437 result
1438 .as_i64()
1439 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
1440 }
1441
1442 pub async fn zincrby(
1447 &self,
1448 key: impl Into<String>,
1449 increment: f64,
1450 member: impl Into<String>,
1451 ) -> Result<f64> {
1452 let mut cmd = redis::cmd("ZINCRBY");
1453 cmd.arg(key.into()).arg(increment).arg(member.into());
1454 let result = self.execute(&cmd).await?;
1455 if let Some(s) = result.as_string() {
1456 s.parse::<f64>()
1457 .map_err(|_| DbError::RedisTypeConversionError("无法转换为浮点数".to_string()))
1458 } else {
1459 result
1460 .as_f64()
1461 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为浮点数".to_string()))
1462 }
1463 }
1464
1465 pub async fn zrank(
1475 &self,
1476 key: impl Into<String>,
1477 member: impl Into<String>,
1478 ) -> Result<Option<i64>> {
1479 let mut cmd = redis::cmd("ZRANK");
1480 cmd.arg(key.into()).arg(member.into());
1481 let result = self.execute(&cmd).await?;
1482 if result.is_nil() {
1483 Ok(None)
1484 } else {
1485 Ok(result.as_i64())
1486 }
1487 }
1488
1489 pub async fn zrevrank(
1499 &self,
1500 key: impl Into<String>,
1501 member: impl Into<String>,
1502 ) -> Result<Option<i64>> {
1503 let mut cmd = redis::cmd("ZREVRANK");
1504 cmd.arg(key.into()).arg(member.into());
1505 let result = self.execute(&cmd).await?;
1506 if result.is_nil() {
1507 Ok(None)
1508 } else {
1509 Ok(result.as_i64())
1510 }
1511 }
1512
1513 pub async fn zrevrange(
1523 &self,
1524 key: impl Into<String>,
1525 start: i64,
1526 stop: i64,
1527 ) -> Result<Vec<String>> {
1528 let mut cmd = redis::cmd("ZREVRANGE");
1529 cmd.arg(key.into()).arg(start).arg(stop);
1530 let result = self.execute(&cmd).await?;
1531 if let Some(arr) = result.as_array() {
1532 Ok(arr.iter().filter_map(|v| v.as_string()).collect())
1533 } else {
1534 Ok(vec![])
1535 }
1536 }
1537
1538 pub async fn zrange_with_scores(
1548 &self,
1549 key: impl Into<String>,
1550 start: i64,
1551 stop: i64,
1552 ) -> Result<Vec<(String, f64)>> {
1553 let mut cmd = redis::cmd("ZRANGE");
1554 cmd.arg(key.into()).arg(start).arg(stop).arg("WITHSCORES");
1555 let result = self.execute(&cmd).await?;
1556 Ok(parse_with_scores(&result))
1557 }
1558
1559 pub async fn zrevrange_with_scores(
1569 &self,
1570 key: impl Into<String>,
1571 start: i64,
1572 stop: i64,
1573 ) -> Result<Vec<(String, f64)>> {
1574 let mut cmd = redis::cmd("ZREVRANGE");
1575 cmd.arg(key.into()).arg(start).arg(stop).arg("WITHSCORES");
1576 let result = self.execute(&cmd).await?;
1577 Ok(parse_with_scores(&result))
1578 }
1579
1580 pub async fn zremrangebyrank(
1590 &self,
1591 key: impl Into<String>,
1592 start: i64,
1593 stop: i64,
1594 ) -> Result<i64> {
1595 let mut cmd = redis::cmd("ZREMRANGEBYRANK");
1596 cmd.arg(key.into()).arg(start).arg(stop);
1597 let result = self.execute(&cmd).await?;
1598 result
1599 .as_i64()
1600 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
1601 }
1602
1603 pub async fn zremrangebyscore(
1613 &self,
1614 key: impl Into<String>,
1615 min: f64,
1616 max: f64,
1617 ) -> Result<i64> {
1618 let mut cmd = redis::cmd("ZREMRANGEBYSCORE");
1619 cmd.arg(key.into()).arg(min).arg(max);
1620 let result = self.execute(&cmd).await?;
1621 result
1622 .as_i64()
1623 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
1624 }
1625
1626 pub async fn zscan(
1637 &self,
1638 key: impl Into<String>,
1639 cursor: i64,
1640 pattern: Option<&str>,
1641 count: Option<i64>,
1642 ) -> Result<(i64, Vec<(String, f64)>)> {
1643 let mut cmd = redis::cmd("ZSCAN");
1644 cmd.arg(key.into()).arg(cursor);
1645 if let Some(p) = pattern {
1646 cmd.arg("MATCH").arg(p);
1647 }
1648 if let Some(c) = count {
1649 cmd.arg("COUNT").arg(c);
1650 }
1651 let result = self.execute(&cmd).await?;
1652 if let Some(arr) = result.as_array() {
1653 let next_cursor = arr
1654 .first()
1655 .and_then(|v| v.as_string())
1656 .and_then(|s| s.parse::<i64>().ok())
1657 .unwrap_or(0);
1658 if let Some(inner) = arr.get(1) {
1659 let pairs = parse_with_scores(inner);
1660 Ok((next_cursor, pairs))
1661 } else {
1662 Ok((next_cursor, vec![]))
1663 }
1664 } else {
1665 Ok((0, vec![]))
1666 }
1667 }
1668
1669 pub async fn del(&self, keys: &[String]) -> Result<i64> {
1676 let mut cmd = redis::cmd("DEL");
1677 for key in keys {
1678 cmd.arg(key);
1679 }
1680 let result = self.execute(&cmd).await?;
1681 result
1682 .as_i64()
1683 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
1684 }
1685
1686 pub async fn exists(&self, keys: &[String]) -> Result<i64> {
1691 let mut cmd = redis::cmd("EXISTS");
1692 for key in keys {
1693 cmd.arg(key);
1694 }
1695 let result = self.execute(&cmd).await?;
1696 result
1697 .as_i64()
1698 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
1699 }
1700
1701 pub async fn expire(&self, key: impl Into<String>, seconds: i64) -> Result<bool> {
1707 let mut cmd = redis::cmd("EXPIRE");
1708 cmd.arg(key.into()).arg(seconds);
1709 let result = self.execute(&cmd).await?;
1710 Ok(result.as_i64() == Some(1))
1711 }
1712
1713 pub async fn ttl(&self, key: impl Into<String>) -> Result<i64> {
1720 let mut cmd = redis::cmd("TTL");
1721 cmd.arg(key.into());
1722 let result = self.execute(&cmd).await?;
1723 result
1724 .as_i64()
1725 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
1726 }
1727
1728 pub async fn persist(&self, key: impl Into<String>) -> Result<bool> {
1734 let mut cmd = redis::cmd("PERSIST");
1735 cmd.arg(key.into());
1736 let result = self.execute(&cmd).await?;
1737 Ok(result.as_i64() == Some(1))
1738 }
1739
1740 pub async fn keys(&self, pattern: impl Into<String>) -> Result<Vec<String>> {
1748 let mut cmd = redis::cmd("KEYS");
1749 cmd.arg(pattern.into());
1750 let result = self.execute(&cmd).await?;
1751 if let Some(arr) = result.as_array() {
1752 Ok(arr.iter().filter_map(|v| v.as_string()).collect())
1753 } else {
1754 Ok(vec![])
1755 }
1756 }
1757
1758 pub async fn scan(
1787 &self,
1788 cursor: i64,
1789 pattern: Option<&str>,
1790 count: Option<i64>,
1791 ) -> Result<(i64, Vec<String>)> {
1792 let mut cmd = redis::cmd("SCAN");
1793 cmd.arg(cursor);
1794 if let Some(p) = pattern {
1795 cmd.arg("MATCH").arg(p);
1796 }
1797 if let Some(c) = count {
1798 cmd.arg("COUNT").arg(c);
1799 }
1800 let result = self.execute(&cmd).await?;
1801 parse_scan_result(&result)
1802 }
1803
1804 pub async fn publish(
1813 &self,
1814 channel: impl Into<String>,
1815 message: impl Into<String>,
1816 ) -> Result<i64> {
1817 let mut cmd = redis::cmd("PUBLISH");
1818 cmd.arg(channel.into()).arg(message.into());
1819 let result = self.execute(&cmd).await?;
1820 result
1821 .as_i64()
1822 .ok_or_else(|| DbError::RedisTypeConversionError("无法转换为整数".to_string()))
1823 }
1824
1825 pub async fn health_check(&self) -> Result<bool> {
1834 let cmd = redis::cmd("PING");
1835 match self.execute(&cmd).await {
1836 Ok(result) => Ok(!result.is_nil()),
1837 Err(_) => Ok(false),
1838 }
1839 }
1840
1841 pub fn pool_status(&self) -> PoolStatus {
1846 let s = self.pool.status();
1847 PoolStatus {
1848 max_size: s.max_size,
1849 size: s.size,
1850 available: s.available,
1851 waiting: s.waiting,
1852 }
1853 }
1854
1855 pub fn script(&self, code: &str) -> redis::Script {
1897 redis::Script::new(code)
1898 }
1899
1900 pub async fn eval_script<T>(
1987 &self,
1988 script: &redis::Script,
1989 keys: &[String],
1990 args: &[String],
1991 ) -> Result<T>
1992 where
1993 T: redis::FromRedisValue,
1994 {
1995 let mut conn = self
1996 .pool
1997 .get()
1998 .await
1999 .map_err(|e| DbError::RedisPoolError(format!("获取连接失败: {}", e)))?;
2000
2001 let mut invocation = script.prepare_invoke();
2003
2004 for key in keys {
2006 invocation.key(key);
2007 }
2008
2009 for arg in args {
2011 invocation.arg(arg);
2012 }
2013
2014 invocation
2016 .invoke_async(&mut *conn)
2017 .await
2018 .map_err(|e| DbError::RedisCommandError(format!("Lua 脚本执行失败: {}", e)))
2019 }
2020}
2021
2022fn parse_scan_result(result: &RedisValue) -> crate::Result<(i64, Vec<String>)> {
2024 if let Some(arr) = result.as_array() {
2025 let cursor = arr
2026 .first()
2027 .and_then(|v| v.as_string())
2028 .and_then(|s| s.parse::<i64>().ok())
2029 .unwrap_or(0);
2030 let members = arr
2031 .get(1)
2032 .and_then(|v| v.as_array())
2033 .map(|a| a.iter().filter_map(|v| v.as_string()).collect())
2034 .unwrap_or_default();
2035 Ok((cursor, members))
2036 } else {
2037 Ok((0, vec![]))
2038 }
2039}
2040
2041fn parse_with_scores(result: &RedisValue) -> Vec<(String, f64)> {
2043 if let Some(arr) = result.as_array() {
2044 let mut pairs = Vec::new();
2045 let mut i = 0;
2046 while i + 1 < arr.len() {
2047 if let (Some(member), Some(score_str)) = (arr[i].as_string(), arr[i + 1].as_string()) {
2048 if let Ok(score) = score_str.parse::<f64>() {
2049 pairs.push((member, score));
2050 }
2051 }
2052 i += 2;
2053 }
2054 pairs
2055 } else {
2056 vec![]
2057 }
2058}
2059
2060#[cfg(test)]
2061mod tests {
2062 use super::*;
2063
2064 #[test]
2065 fn test_redis_client_clone() {
2066 let config = RedisConfig::default();
2068 let _ = config.clone();
2070 }
2071
2072 #[test]
2073 fn test_redis_config_default() {
2074 let config = RedisConfig::default();
2075 assert_eq!(config.max_connections, 10);
2076 assert_eq!(config.connect_timeout, 5);
2077 assert_eq!(config.wait_timeout, 10);
2078 assert!(!config.enable_logging);
2079 }
2080
2081 #[test]
2082 fn test_redis_config_custom() {
2083 let config = RedisConfig::new(20, 10, 15, true);
2084 assert_eq!(config.max_connections, 20);
2085 assert_eq!(config.connect_timeout, 10);
2086 assert_eq!(config.wait_timeout, 15);
2087 assert!(config.enable_logging);
2088 }
2089
2090 #[test]
2094 fn test_pool_config_construction() {
2095 let redis_config = RedisConfig::new(25, 8, 12, true);
2096
2097 let pool_config = PoolConfig {
2099 max_size: redis_config.max_connections,
2100 timeouts: Timeouts {
2101 wait: Some(Duration::from_secs(redis_config.wait_timeout)),
2102 create: Some(Duration::from_secs(redis_config.connect_timeout)),
2103 recycle: Some(Duration::from_secs(redis_config.connect_timeout)),
2104 },
2105 ..Default::default()
2106 };
2107
2108 assert_eq!(pool_config.max_size, 25, "最大连接数应为 25");
2110 assert_eq!(
2111 pool_config.timeouts.wait,
2112 Some(Duration::from_secs(12)),
2113 "等待超时应为 12 秒"
2114 );
2115 assert_eq!(
2116 pool_config.timeouts.create,
2117 Some(Duration::from_secs(8)),
2118 "创建超时应为 8 秒"
2119 );
2120 assert_eq!(
2121 pool_config.timeouts.recycle,
2122 Some(Duration::from_secs(8)),
2123 "回收超时应为 8 秒"
2124 );
2125 }
2126
2127 #[test]
2129 fn test_default_pool_config_construction() {
2130 let redis_config = RedisConfig::default();
2131
2132 let pool_config = PoolConfig {
2133 max_size: redis_config.max_connections,
2134 timeouts: Timeouts {
2135 wait: Some(Duration::from_secs(redis_config.wait_timeout)),
2136 create: Some(Duration::from_secs(redis_config.connect_timeout)),
2137 recycle: Some(Duration::from_secs(redis_config.connect_timeout)),
2138 },
2139 ..Default::default()
2140 };
2141
2142 assert_eq!(pool_config.max_size, 10, "默认最大连接数应为 10");
2143 assert_eq!(
2144 pool_config.timeouts.wait,
2145 Some(Duration::from_secs(10)),
2146 "默认等待超时应为 10 秒"
2147 );
2148 assert_eq!(
2149 pool_config.timeouts.create,
2150 Some(Duration::from_secs(5)),
2151 "默认创建超时应为 5 秒"
2152 );
2153 }
2154
2155 #[test]
2156 fn test_pool_status_creation() {
2157 let status = PoolStatus {
2158 max_size: 10,
2159 size: 5,
2160 available: 3,
2161 waiting: 2,
2162 };
2163 assert_eq!(status.max_size, 10);
2164 assert_eq!(status.size, 5);
2165 assert_eq!(status.available, 3);
2166 assert_eq!(status.waiting, 2);
2167 }
2168
2169 #[test]
2170 fn test_parse_scan_result_valid() {
2171 let result = RedisValue::Array(vec![
2172 RedisValue::String("5".to_string()),
2173 RedisValue::Array(vec![
2174 RedisValue::String("key1".to_string()),
2175 RedisValue::String("key2".to_string()),
2176 ]),
2177 ]);
2178 let (cursor, keys) = parse_scan_result(&result).unwrap();
2179 assert_eq!(cursor, 5);
2180 assert_eq!(keys, vec!["key1".to_string(), "key2".to_string()]);
2181 }
2182
2183 #[test]
2184 fn test_parse_scan_result_empty() {
2185 let result = RedisValue::Array(vec![
2186 RedisValue::String("0".to_string()),
2187 RedisValue::Array(vec![]),
2188 ]);
2189 let (cursor, keys) = parse_scan_result(&result).unwrap();
2190 assert_eq!(cursor, 0);
2191 assert!(keys.is_empty());
2192 }
2193
2194 #[test]
2195 fn test_parse_with_scores_valid() {
2196 let result = RedisValue::Array(vec![
2197 RedisValue::String("alice".to_string()),
2198 RedisValue::String("100".to_string()),
2199 RedisValue::String("bob".to_string()),
2200 RedisValue::String("95.5".to_string()),
2201 ]);
2202 let pairs = parse_with_scores(&result);
2203 assert_eq!(pairs.len(), 2);
2204 assert_eq!(pairs[0], ("alice".to_string(), 100.0));
2205 assert_eq!(pairs[1], ("bob".to_string(), 95.5));
2206 }
2207
2208 #[test]
2209 fn test_parse_with_scores_empty() {
2210 let result = RedisValue::Array(vec![]);
2211 let pairs = parse_with_scores(&result);
2212 assert!(pairs.is_empty());
2213 }
2214}