1use std::collections::HashMap;
16use std::sync::atomic::AtomicBool;
17use std::sync::{atomic, Arc};
18use std::time::Instant;
19
20use bincode::{Decode, Encode};
21use bytes::Bytes;
22use fred::clients::Transaction;
23use fred::cmd;
24use fred::prelude::{RedisPool as FredRedisPool, *};
25use fred::types::{
26 InfoKind, Limit, MultipleKeys, MultipleOrderedPairs, MultipleValues, MultipleZaddValues,
27 Ordering, RedisKey, RedisMap, XCap, ZRange, ZSort, XID,
28};
29use tracing::{debug, Level};
30
31use crate::metrics::redis::{record_redis_failure, record_redis_success};
32use crate::serialization::{deserialize, serialize};
33
34pub use fred::prelude::RedisError;
36
37#[derive(Clone, Debug)]
38pub struct RedisPool {
39 pool: FredRedisPool,
40 key_prefix: String,
41 connected: Arc<AtomicBool>,
42}
43
44impl RedisPool {
45 pub fn new(pool: FredRedisPool, key_prefix: String) -> Self {
46 Self {
47 pool,
48 key_prefix,
49 connected: Arc::new(AtomicBool::new(false)),
50 }
51 }
52
53 pub async fn configured(config: &crate::config::RedisConfig) -> Result<RedisPool, RedisError> {
54 let mut redis_config = RedisConfig::from_url(config.url().as_str())?;
55 redis_config.tracing = TracingConfig::new(config.tracing);
56 redis_config.tracing.default_tracing_level = Level::DEBUG;
57 redis_config.username.clone_from(&config.username);
58 redis_config.password.clone_from(&config.password);
59
60 let policy = ReconnectPolicy::new_exponential(
62 config.retries.max_attempts,
63 config.retries.min_delay.as_millis() as u32,
64 config.retries.max_delay.as_millis() as u32,
65 config.retries.multiplier.round() as u32,
66 );
67 let pool = FredRedisPool::new(redis_config, None, None, Some(policy), config.pool_size)?;
68
69 Ok(RedisPool {
70 pool,
71 key_prefix: config.key_prefix.clone(),
72 connected: Arc::new(AtomicBool::new(false)),
73 })
74 }
75
76 pub fn with<'a>(
77 &'a self,
78 svc_name: &'static str,
79 api_name: &'static str,
80 ) -> RedisLabelledApi<'a> {
81 RedisLabelledApi {
82 svc_name,
83 api_name,
84 pool: self.pool.clone(),
85 key_prefix: self.key_prefix.clone(),
86 connected: &self.connected,
87 }
88 }
89
90 pub fn serialize<T: Encode>(&self, value: &T) -> Result<Bytes, String> {
91 serialize(value)
92 }
93
94 pub fn deserialize<T: Decode<()>>(&self, bytes: &[u8]) -> Result<T, String> {
95 deserialize(bytes)
96 }
97}
98
99pub struct RedisLabelledApi<'a> {
100 svc_name: &'static str,
101 api_name: &'static str,
102 pool: FredRedisPool,
103 key_prefix: String,
104 connected: &'a AtomicBool,
105}
106
107impl RedisLabelledApi<'_> {
108 pub async fn ensure_connected(&self) -> Result<(), RedisError> {
109 if !self.connected.swap(true, atomic::Ordering::Relaxed) {
110 let _connection = self.pool.connect();
111 self.pool.wait_for_connect().await?;
112 }
113 Ok(())
114 }
115
116 fn record<R>(
117 &self,
118 start: Instant,
119 cmd_name: &'static str,
120 result: RedisResult<R>,
121 ) -> RedisResult<R> {
122 let end = Instant::now();
123 match result {
124 Ok(result) => {
125 record_redis_success(
126 self.svc_name,
127 self.api_name,
128 cmd_name,
129 end.duration_since(start),
130 );
131 Ok(result)
132 }
133 Err(err) => {
134 record_redis_failure(self.svc_name, self.api_name, cmd_name);
135 Err(err)
136 }
137 }
138 }
139
140 fn prefixed_key<K>(&self, key: K) -> String
141 where
142 K: AsRef<str>,
143 {
144 format!("{}{}", &self.key_prefix, key.as_ref())
145 }
146
147 pub async fn del<R, K>(&self, key: K) -> RedisResult<R>
148 where
149 R: FromRedis,
150 K: AsRef<str>,
151 {
152 self.ensure_connected().await?;
153 let start = Instant::now();
154 self.record(start, "DEL", self.pool.del(self.prefixed_key(key)).await)
155 }
156
157 pub async fn del_many<R, K>(&self, key: Vec<K>) -> RedisResult<R>
158 where
159 R: FromRedis,
160 K: AsRef<str>,
161 {
162 self.ensure_connected().await?;
163 let start = Instant::now();
164 self.record(
165 start,
166 "DEL",
167 self.pool
168 .del(key.iter().map(|k| self.prefixed_key(k)).collect::<Vec<_>>())
169 .await,
170 )
171 }
172
173 pub async fn get<R, K>(&self, key: K) -> RedisResult<R>
174 where
175 R: FromRedis,
176 K: AsRef<str>,
177 {
178 self.ensure_connected().await?;
179 let start = Instant::now();
180 self.record(start, "GET", self.pool.get(self.prefixed_key(key)).await)
181 }
182
183 pub async fn exists<R, K>(&self, key: K) -> RedisResult<R>
184 where
185 R: FromRedis,
186 K: AsRef<str>,
187 {
188 self.ensure_connected().await?;
189 let start = Instant::now();
190 self.record(
191 start,
192 "EXISTS",
193 self.pool.exists(self.prefixed_key(key)).await,
194 )
195 }
196
197 pub async fn expire<R, K>(&self, key: K, seconds: i64) -> RedisResult<R>
198 where
199 R: FromRedis,
200 K: Into<RedisKey> + Send + AsRef<str>,
201 {
202 self.ensure_connected().await?;
203 let start = Instant::now();
204 self.record(
205 start,
206 "EXPIRE",
207 self.pool.expire(self.prefixed_key(key), seconds).await,
208 )
209 }
210
211 pub async fn mget<R, K>(&self, keys: K) -> RedisResult<R>
212 where
213 R: FromRedis,
214 K: Into<MultipleKeys> + Send,
215 {
216 self.ensure_connected().await?;
217 let start = Instant::now();
218 let keys = keys.into();
219 self.record(
220 start,
221 "MGET",
222 self.pool
223 .mget(
224 keys.inner()
225 .iter()
226 .map(|k| self.prefixed_key(k.as_str().expect("key must be a string")))
227 .collect::<Vec<_>>(),
228 )
229 .await,
230 )
231 }
232
233 pub async fn hdel<R, K, F>(&self, key: K, fields: F) -> RedisResult<R>
234 where
235 R: FromRedis,
236 K: AsRef<str>,
237 F: Into<MultipleKeys> + Send,
238 {
239 self.ensure_connected().await?;
240 let start = Instant::now();
241 self.record(
242 start,
243 "HDEL",
244 self.pool.hdel(self.prefixed_key(key), fields).await,
245 )
246 }
247
248 pub async fn hexists<R, K, F>(&self, key: K, field: F) -> RedisResult<R>
249 where
250 R: FromRedis,
251 K: AsRef<str>,
252 F: Into<RedisKey> + Send,
253 {
254 self.ensure_connected().await?;
255 let start = Instant::now();
256 self.record(
257 start,
258 "HEXISTS",
259 self.pool.hexists(self.prefixed_key(key), field).await,
260 )
261 }
262
263 pub async fn hget<R, K, F>(&self, key: K, field: F) -> RedisResult<R>
264 where
265 R: FromRedis,
266 K: AsRef<str>,
267 F: Into<RedisKey> + Send,
268 {
269 self.ensure_connected().await?;
270 let start = Instant::now();
271 self.record(
272 start,
273 "HGET",
274 self.pool.hget(self.prefixed_key(key), field).await,
275 )
276 }
277
278 pub async fn hkeys<R, K>(&self, key: K) -> RedisResult<R>
279 where
280 R: FromRedis,
281 K: AsRef<str>,
282 {
283 self.ensure_connected().await?;
284 let start = Instant::now();
285 self.record(
286 start,
287 "HKEYS",
288 self.pool.hkeys(self.prefixed_key(key)).await,
289 )
290 }
291
292 pub async fn hmget<R, K, F>(&self, key: K, fields: F) -> RedisResult<R>
293 where
294 R: FromRedis,
295 K: AsRef<str>,
296 F: Into<MultipleKeys> + Send,
297 {
298 self.ensure_connected().await?;
299 let start = Instant::now();
300 self.record(
301 start,
302 "HMGET",
303 self.pool.hmget(self.prefixed_key(key), fields).await,
304 )
305 }
306
307 pub async fn mset<K, V>(&self, key_values: HashMap<K, V>) -> RedisResult<()>
308 where
309 K: AsRef<str>,
310 V: TryInto<RedisValue> + Send,
311 V::Error: Into<RedisError> + Send,
312 {
313 self.ensure_connected().await?;
314 let start = Instant::now();
315 self.record(
316 start,
317 "MSET",
318 self.pool
319 .mset(
320 key_values
321 .into_iter()
322 .map(|(k, v)| (self.prefixed_key(k), v))
323 .collect::<Vec<_>>(),
324 )
325 .await,
326 )
327 }
328
329 pub async fn hmset<R, K, V>(&self, key: K, values: V) -> RedisResult<R>
330 where
331 R: FromRedis,
332 K: AsRef<str>,
333 V: TryInto<RedisMap> + Send,
334 V::Error: Into<RedisError> + Send,
335 {
336 self.ensure_connected().await?;
337 let start = Instant::now();
338 self.record(
339 start,
340 "HMSET",
341 self.pool.hmset(self.prefixed_key(key), values).await,
342 )
343 }
344
345 pub async fn hset<R, K, V>(&self, key: K, values: V) -> RedisResult<R>
346 where
347 R: FromRedis,
348 K: AsRef<str>,
349 V: TryInto<RedisMap> + Send,
350 V::Error: Into<RedisError> + Send,
351 {
352 self.ensure_connected().await?;
353 let start = Instant::now();
354 self.record(
355 start,
356 "HSET",
357 self.pool.hset(self.prefixed_key(key), values).await,
358 )
359 }
360
361 pub async fn hsetnx<R, K, F, V>(&self, key: K, field: F, value: V) -> RedisResult<R>
362 where
363 R: FromRedis,
364 K: AsRef<str>,
365 F: Into<RedisKey> + Send,
366 V: TryInto<RedisValue> + Send,
367 V::Error: Into<RedisError> + Send,
368 {
369 self.ensure_connected().await?;
370 let start = Instant::now();
371 self.record(
372 start,
373 "HSETNX",
374 self.pool.hsetnx(self.prefixed_key(key), field, value).await,
375 )
376 }
377
378 pub async fn sadd<R, K, V>(&self, key: K, members: V) -> RedisResult<R>
379 where
380 R: FromRedis,
381 K: AsRef<str>,
382 V: TryInto<MultipleValues> + Send,
383 V::Error: Into<RedisError> + Send,
384 {
385 self.ensure_connected().await?;
386 let start = Instant::now();
387 self.record(
388 start,
389 "SADD",
390 self.pool.sadd(self.prefixed_key(key), members).await,
391 )
392 }
393
394 pub async fn set<R, K, V>(
395 &self,
396 key: K,
397 value: V,
398 expire: Option<Expiration>,
399 options: Option<SetOptions>,
400 get: bool,
401 ) -> RedisResult<R>
402 where
403 R: FromRedis,
404 K: AsRef<str>,
405 V: TryInto<RedisValue> + Send,
406 V::Error: Into<RedisError> + Send,
407 {
408 self.ensure_connected().await?;
409 let start = Instant::now();
410 self.record(
411 start,
412 "SET",
413 self.pool
414 .set(self.prefixed_key(key), value, expire, options, get)
415 .await,
416 )
417 }
418
419 pub async fn smembers<R, K>(&self, key: K) -> RedisResult<R>
420 where
421 R: FromRedis,
422 K: AsRef<str>,
423 {
424 self.ensure_connected().await?;
425 let start = Instant::now();
426 self.record(
427 start,
428 "SMEMBERS",
429 self.pool.smembers(self.prefixed_key(key)).await,
430 )
431 }
432
433 pub async fn srem<R, K, V>(&self, key: K, members: V) -> RedisResult<R>
434 where
435 R: FromRedis,
436 K: AsRef<str>,
437 V: TryInto<MultipleValues> + Send,
438 V::Error: Into<RedisError> + Send,
439 {
440 self.ensure_connected().await?;
441 let start = Instant::now();
442 self.record(
443 start,
444 "SREM",
445 self.pool.srem(self.prefixed_key(key), members).await,
446 )
447 }
448
449 pub async fn scard<R, K>(&self, key: K) -> RedisResult<R>
450 where
451 R: FromRedis,
452 K: AsRef<str>,
453 {
454 self.ensure_connected().await?;
455 let start = Instant::now();
456 self.record(
457 start,
458 "SCARD",
459 self.pool.scard(self.prefixed_key(key)).await,
460 )
461 }
462
463 pub async fn xadd<R, K, C, I, F>(
464 &self,
465 key: K,
466 nomkstream: bool,
467 cap: C,
468 id: I,
469 fields: F,
470 ) -> RedisResult<R>
471 where
472 R: FromRedis,
473 K: AsRef<str>,
474 I: Into<XID> + Send,
475 F: TryInto<MultipleOrderedPairs> + Send,
476 F::Error: Into<RedisError> + Send,
477 C: TryInto<XCap> + Send,
478 C::Error: Into<RedisError> + Send,
479 {
480 self.ensure_connected().await?;
481 let start = Instant::now();
482 self.record(
483 start,
484 "XADD",
485 self.pool
486 .xadd(self.prefixed_key(key), nomkstream, cap, id, fields)
487 .await,
488 )
489 }
490
491 pub async fn xlen<R, K>(&self, key: K) -> RedisResult<R>
492 where
493 R: FromRedis,
494 K: AsRef<str>,
495 {
496 self.ensure_connected().await?;
497 let start = Instant::now();
498 self.record(start, "XLEN", self.pool.xlen(self.prefixed_key(key)).await)
499 }
500
501 pub async fn xrange<R, K, S, E>(
502 &self,
503 key: K,
504 start: S,
505 end: E,
506 count: Option<u64>,
507 ) -> RedisResult<R>
508 where
509 R: FromRedis,
510 K: AsRef<str>,
511 S: TryInto<RedisValue> + Send,
512 S::Error: Into<RedisError> + Send,
513 E: TryInto<RedisValue> + Send,
514 E::Error: Into<RedisError> + Send,
515 {
516 self.ensure_connected().await?;
517 let start_time = Instant::now();
518 self.record(
519 start_time,
520 "XRANGE",
521 self.pool
522 .xrange(self.prefixed_key(key), start, end, count)
523 .await,
524 )
525 }
526
527 pub async fn xrevrange<R, K, S, E>(
528 &self,
529 key: K,
530 end: E,
531 start: S,
532 count: Option<u64>,
533 ) -> RedisResult<R>
534 where
535 R: FromRedis,
536 K: AsRef<str>,
537 S: TryInto<RedisValue> + Send,
538 S::Error: Into<RedisError> + Send,
539 E: TryInto<RedisValue> + Send,
540 E::Error: Into<RedisError> + Send,
541 {
542 self.ensure_connected().await?;
543 let start_time = Instant::now();
544 self.record(
545 start_time,
546 "XREVRANGE",
547 self.pool
548 .xrevrange(self.prefixed_key(key), end, start, count)
549 .await,
550 )
551 }
552
553 pub async fn xtrim<R, K, C>(&self, key: K, cap: C) -> RedisResult<R>
554 where
555 R: FromRedis,
556 K: AsRef<str>,
557 C: TryInto<XCap> + Send,
558 C::Error: Into<RedisError> + Send,
559 {
560 self.ensure_connected().await?;
561 let start = Instant::now();
562 self.record(
563 start,
564 "XTRIM",
565 self.pool.xtrim(self.prefixed_key(key), cap).await,
566 )
567 }
568
569 pub async fn zadd<R, K, V>(
570 &self,
571 key: K,
572 options: Option<SetOptions>,
573 ordering: Option<Ordering>,
574 changed: bool,
575 incr: bool,
576 values: V,
577 ) -> RedisResult<R>
578 where
579 R: FromRedis,
580 K: AsRef<str>,
581 V: TryInto<MultipleZaddValues> + Send,
582 V::Error: Into<RedisError> + Send,
583 {
584 self.ensure_connected().await?;
585 let start = Instant::now();
586 self.record(
587 start,
588 "ZADD",
589 self.pool
590 .zadd(
591 self.prefixed_key(key),
592 options,
593 ordering,
594 changed,
595 incr,
596 values,
597 )
598 .await,
599 )
600 }
601
602 pub async fn zrange<R, K, M, N>(
603 &self,
604 key: K,
605 min: M,
606 max: N,
607 sort: Option<ZSort>,
608 rev: bool,
609 limit: Option<Limit>,
610 withscores: bool,
611 ) -> RedisResult<R>
612 where
613 R: FromRedis,
614 K: AsRef<str>,
615 M: TryInto<ZRange> + Send,
616 M::Error: Into<RedisError> + Send,
617 N: TryInto<ZRange> + Send,
618 N::Error: Into<RedisError> + Send,
619 {
620 self.ensure_connected().await?;
621 let start = Instant::now();
622 self.record(
623 start,
624 "ZRANGE",
625 self.pool
626 .zrange(
627 self.prefixed_key(key),
628 min,
629 max,
630 sort,
631 rev,
632 limit,
633 withscores,
634 )
635 .await,
636 )
637 }
638
639 pub async fn zrangebyscore<R, K, M, N>(
640 &self,
641 key: K,
642 min: M,
643 max: N,
644 withscores: bool,
645 limit: Option<Limit>,
646 ) -> RedisResult<R>
647 where
648 R: FromRedis,
649 K: AsRef<str>,
650 M: TryInto<ZRange> + Send,
651 M::Error: Into<RedisError> + Send,
652 N: TryInto<ZRange> + Send,
653 N::Error: Into<RedisError> + Send,
654 {
655 self.ensure_connected().await?;
656 let start = Instant::now();
657 self.record(
658 start,
659 "ZRANGEBYSCORE",
660 self.pool
661 .zrangebyscore(self.prefixed_key(key), min, max, withscores, limit)
662 .await,
663 )
664 }
665
666 pub async fn zrem<R, K, V>(&self, key: K, members: V) -> RedisResult<R>
667 where
668 R: FromRedis,
669 K: AsRef<str>,
670 V: TryInto<MultipleValues> + Send,
671 V::Error: Into<RedisError> + Send,
672 {
673 self.ensure_connected().await?;
674 let start = Instant::now();
675 self.record(
676 start,
677 "ZREM",
678 self.pool.zrem(self.prefixed_key(key), members).await,
679 )
680 }
681
682 pub async fn transaction<R, F, Fu>(&self, func: F) -> RedisResult<R>
683 where
684 R: FromRedis,
685 F: FnOnce(RedisTransaction) -> Fu,
686 Fu: std::future::Future<Output = RedisResult<RedisTransaction>>,
687 {
688 self.ensure_connected().await?;
689 let start = Instant::now();
690
691 let client = self.pool.next_connected();
692 let trx = client.multi();
693 let trx = RedisTransaction::new(trx, self.key_prefix.clone());
694 let trx = func(trx).await?;
695
696 self.record(start, "MULTI", trx.trx.exec(true).await)
697 }
698
699 pub async fn wait(&self, replicas: i64, timeout: i64) -> RedisResult<i64> {
700 self.ensure_connected().await?;
701 let start = Instant::now();
702 self.record(start, "WAIT", self.pool.wait(replicas, timeout).await)
703 }
704
705 pub async fn info_connected_slaves(&self) -> RedisResult<u8> {
706 self.ensure_connected().await?;
707 let start = Instant::now();
708 let info: String = self.record(
709 start,
710 "INFO",
711 self.pool.info(Some(InfoKind::Replication)).await,
712 )?;
713 let info: HashMap<&str, &str> =
714 HashMap::from_iter(info.lines().filter_map(|line| line.trim().split_once(':')));
715 debug!(info = format!("{:?}", info), "Redis replication info");
716 let connected_slaves = info
717 .get("connected_slaves")
718 .and_then(|s| s.parse().ok())
719 .unwrap_or(0);
720 Ok(connected_slaves)
721 }
722
723 pub async fn scan<K>(
724 &self,
725 pattern: K,
726 cursor: u64,
727 count: u64,
728 ) -> RedisResult<(u64, Vec<String>)>
729 where
730 K: AsRef<str>,
731 {
732 self.ensure_connected().await?;
733 let start = Instant::now();
734
735 let args: Vec<String> = vec![
737 cursor.to_string(),
738 "MATCH".to_string(),
739 self.prefixed_key(pattern),
740 "COUNT".to_string(),
741 count.to_string(),
742 ];
743
744 self.record(
747 start,
748 "SCAN",
749 self.pool
750 .next()
751 .custom_raw(cmd!("SCAN"), args)
752 .await
753 .and_then(|f| self.parse_key_scan_frame(f)),
754 )
755 }
756
757 pub async fn keys<K>(&self, pattern: K) -> RedisResult<Vec<String>>
758 where
759 K: AsRef<str>,
760 {
761 self.ensure_connected().await?;
762 let start = Instant::now();
763
764 let args: Vec<String> = vec![self.prefixed_key(pattern)];
766
767 self.record(
770 start,
771 "KEYS",
772 self.pool
773 .next()
774 .custom_raw(cmd!("KEYS"), args)
775 .await
776 .and_then(|f| f.try_into())
777 .and_then(|v: RedisValue| v.convert::<Vec<String>>())
778 .map(|keys| {
779 keys.into_iter()
780 .map(|key| key[self.key_prefix.len()..].to_string())
781 .collect()
782 }),
783 )
784 }
785
786 fn parse_key_scan_frame(&self, frame: Resp3Frame) -> RedisResult<(u64, Vec<String>)> {
787 use fred::prelude::*;
788 if let Resp3Frame::Array { mut data, .. } = frame {
789 if data.len() == 2 {
790 let cursor: u64 = data[0]
791 .clone()
792 .try_into()
793 .and_then(|value: RedisValue| value.convert())?;
794
795 if let Some(Resp3Frame::Array { data, .. }) = data.pop() {
796 let mut keys = Vec::with_capacity(data.len());
797
798 let key_prefix_len = self.key_prefix.len();
799
800 for frame in data.into_iter() {
801 let key: String = frame
802 .try_into()
803 .and_then(|value: RedisValue| value.convert())?;
804
805 if key_prefix_len > 0 {
806 keys.push(key[key_prefix_len..].to_string());
807 } else {
808 keys.push(key);
809 }
810 }
811
812 Ok((cursor, keys))
813 } else {
814 Err(RedisError::new(
815 RedisErrorKind::Protocol,
816 "Expected second SCAN result element to be an array.",
817 ))
818 }
819 } else {
820 Err(RedisError::new(
821 RedisErrorKind::Protocol,
822 "Expected two-element bulk string array from SCAN.",
823 ))
824 }
825 } else {
826 Err(RedisError::new(
827 RedisErrorKind::Protocol,
828 "Expected bulk string array from SCAN.",
829 ))
830 }
831 }
832}
833
834pub struct RedisTransaction {
835 trx: Transaction,
836 key_prefix: String,
837}
838
839impl RedisTransaction {
840 fn new(trx: Transaction, key_prefix: String) -> Self {
841 Self { trx, key_prefix }
842 }
843
844 fn prefixed_key<K>(&self, key: K) -> String
845 where
846 K: AsRef<str>,
847 {
848 format!("{}{}", &self.key_prefix, key.as_ref())
849 }
850
851 pub async fn del<K>(&self, key: K) -> RedisResult<()>
852 where
853 K: AsRef<str>,
854 {
855 self.trx.del(self.prefixed_key(key)).await
856 }
857
858 pub async fn set<K, V>(
859 &self,
860 key: K,
861 value: V,
862 expire: Option<Expiration>,
863 options: Option<SetOptions>,
864 get: bool,
865 ) -> RedisResult<()>
866 where
867 K: AsRef<str>,
868 V: TryInto<RedisValue> + Send,
869 V::Error: Into<RedisError> + Send,
870 {
871 self.trx
872 .set(self.prefixed_key(key), value, expire, options, get)
873 .await
874 }
875
876 pub async fn sadd<K, V>(&self, key: K, members: V) -> RedisResult<()>
877 where
878 K: AsRef<str>,
879 V: TryInto<MultipleValues> + Send,
880 V::Error: Into<RedisError> + Send,
881 {
882 self.trx.sadd(self.prefixed_key(key), members).await
883 }
884
885 pub async fn srem<K, V>(&self, key: K, members: V) -> RedisResult<()>
886 where
887 K: AsRef<str>,
888 V: TryInto<MultipleValues> + Send,
889 V::Error: Into<RedisError> + Send,
890 {
891 self.trx.srem(self.prefixed_key(key), members).await
892 }
893
894 pub async fn scard<K>(&self, key: K) -> RedisResult<()>
895 where
896 K: AsRef<str>,
897 {
898 self.trx.scard(self.prefixed_key(key)).await
899 }
900}