golem_common/
redis.rs

1// Copyright 2024-2025 Golem Cloud
2//
3// Licensed under the Golem Source License v1.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://license.golem.cloud/LICENSE
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::atomic::AtomicBool;
17use std::sync::{atomic, Arc};
18use std::time::Instant;
19
20use bincode::{Decode, Encode};
21use bytes::Bytes;
22use fred::clients::Transaction;
23use fred::cmd;
24use fred::prelude::{RedisPool as FredRedisPool, *};
25use fred::types::{
26    InfoKind, Limit, MultipleKeys, MultipleOrderedPairs, MultipleValues, MultipleZaddValues,
27    Ordering, RedisKey, RedisMap, XCap, ZRange, ZSort, XID,
28};
29use tracing::{debug, Level};
30
31use crate::metrics::redis::{record_redis_failure, record_redis_success};
32use crate::serialization::{deserialize, serialize};
33
34// Re-export fred Error
35pub use fred::prelude::RedisError;
36
37#[derive(Clone, Debug)]
38pub struct RedisPool {
39    pool: FredRedisPool,
40    key_prefix: String,
41    connected: Arc<AtomicBool>,
42}
43
44impl RedisPool {
45    pub fn new(pool: FredRedisPool, key_prefix: String) -> Self {
46        Self {
47            pool,
48            key_prefix,
49            connected: Arc::new(AtomicBool::new(false)),
50        }
51    }
52
53    pub async fn configured(config: &crate::config::RedisConfig) -> Result<RedisPool, RedisError> {
54        let mut redis_config = RedisConfig::from_url(config.url().as_str())?;
55        redis_config.tracing = TracingConfig::new(config.tracing);
56        redis_config.tracing.default_tracing_level = Level::DEBUG;
57        redis_config.username.clone_from(&config.username);
58        redis_config.password.clone_from(&config.password);
59
60        // NOTE: jitter setting is not converted, using the default fred jitter settings
61        let policy = ReconnectPolicy::new_exponential(
62            config.retries.max_attempts,
63            config.retries.min_delay.as_millis() as u32,
64            config.retries.max_delay.as_millis() as u32,
65            config.retries.multiplier.round() as u32,
66        );
67        let pool = FredRedisPool::new(redis_config, None, None, Some(policy), config.pool_size)?;
68
69        Ok(RedisPool {
70            pool,
71            key_prefix: config.key_prefix.clone(),
72            connected: Arc::new(AtomicBool::new(false)),
73        })
74    }
75
76    pub fn with<'a>(
77        &'a self,
78        svc_name: &'static str,
79        api_name: &'static str,
80    ) -> RedisLabelledApi<'a> {
81        RedisLabelledApi {
82            svc_name,
83            api_name,
84            pool: self.pool.clone(),
85            key_prefix: self.key_prefix.clone(),
86            connected: &self.connected,
87        }
88    }
89
90    pub fn serialize<T: Encode>(&self, value: &T) -> Result<Bytes, String> {
91        serialize(value)
92    }
93
94    pub fn deserialize<T: Decode<()>>(&self, bytes: &[u8]) -> Result<T, String> {
95        deserialize(bytes)
96    }
97}
98
99pub struct RedisLabelledApi<'a> {
100    svc_name: &'static str,
101    api_name: &'static str,
102    pool: FredRedisPool,
103    key_prefix: String,
104    connected: &'a AtomicBool,
105}
106
107impl RedisLabelledApi<'_> {
108    pub async fn ensure_connected(&self) -> Result<(), RedisError> {
109        if !self.connected.swap(true, atomic::Ordering::Relaxed) {
110            let _connection = self.pool.connect();
111            self.pool.wait_for_connect().await?;
112        }
113        Ok(())
114    }
115
116    fn record<R>(
117        &self,
118        start: Instant,
119        cmd_name: &'static str,
120        result: RedisResult<R>,
121    ) -> RedisResult<R> {
122        let end = Instant::now();
123        match result {
124            Ok(result) => {
125                record_redis_success(
126                    self.svc_name,
127                    self.api_name,
128                    cmd_name,
129                    end.duration_since(start),
130                );
131                Ok(result)
132            }
133            Err(err) => {
134                record_redis_failure(self.svc_name, self.api_name, cmd_name);
135                Err(err)
136            }
137        }
138    }
139
140    fn prefixed_key<K>(&self, key: K) -> String
141    where
142        K: AsRef<str>,
143    {
144        format!("{}{}", &self.key_prefix, key.as_ref())
145    }
146
147    pub async fn del<R, K>(&self, key: K) -> RedisResult<R>
148    where
149        R: FromRedis,
150        K: AsRef<str>,
151    {
152        self.ensure_connected().await?;
153        let start = Instant::now();
154        self.record(start, "DEL", self.pool.del(self.prefixed_key(key)).await)
155    }
156
157    pub async fn del_many<R, K>(&self, key: Vec<K>) -> RedisResult<R>
158    where
159        R: FromRedis,
160        K: AsRef<str>,
161    {
162        self.ensure_connected().await?;
163        let start = Instant::now();
164        self.record(
165            start,
166            "DEL",
167            self.pool
168                .del(key.iter().map(|k| self.prefixed_key(k)).collect::<Vec<_>>())
169                .await,
170        )
171    }
172
173    pub async fn get<R, K>(&self, key: K) -> RedisResult<R>
174    where
175        R: FromRedis,
176        K: AsRef<str>,
177    {
178        self.ensure_connected().await?;
179        let start = Instant::now();
180        self.record(start, "GET", self.pool.get(self.prefixed_key(key)).await)
181    }
182
183    pub async fn exists<R, K>(&self, key: K) -> RedisResult<R>
184    where
185        R: FromRedis,
186        K: AsRef<str>,
187    {
188        self.ensure_connected().await?;
189        let start = Instant::now();
190        self.record(
191            start,
192            "EXISTS",
193            self.pool.exists(self.prefixed_key(key)).await,
194        )
195    }
196
197    pub async fn expire<R, K>(&self, key: K, seconds: i64) -> RedisResult<R>
198    where
199        R: FromRedis,
200        K: Into<RedisKey> + Send + AsRef<str>,
201    {
202        self.ensure_connected().await?;
203        let start = Instant::now();
204        self.record(
205            start,
206            "EXPIRE",
207            self.pool.expire(self.prefixed_key(key), seconds).await,
208        )
209    }
210
211    pub async fn mget<R, K>(&self, keys: K) -> RedisResult<R>
212    where
213        R: FromRedis,
214        K: Into<MultipleKeys> + Send,
215    {
216        self.ensure_connected().await?;
217        let start = Instant::now();
218        let keys = keys.into();
219        self.record(
220            start,
221            "MGET",
222            self.pool
223                .mget(
224                    keys.inner()
225                        .iter()
226                        .map(|k| self.prefixed_key(k.as_str().expect("key must be a string")))
227                        .collect::<Vec<_>>(),
228                )
229                .await,
230        )
231    }
232
233    pub async fn hdel<R, K, F>(&self, key: K, fields: F) -> RedisResult<R>
234    where
235        R: FromRedis,
236        K: AsRef<str>,
237        F: Into<MultipleKeys> + Send,
238    {
239        self.ensure_connected().await?;
240        let start = Instant::now();
241        self.record(
242            start,
243            "HDEL",
244            self.pool.hdel(self.prefixed_key(key), fields).await,
245        )
246    }
247
248    pub async fn hexists<R, K, F>(&self, key: K, field: F) -> RedisResult<R>
249    where
250        R: FromRedis,
251        K: AsRef<str>,
252        F: Into<RedisKey> + Send,
253    {
254        self.ensure_connected().await?;
255        let start = Instant::now();
256        self.record(
257            start,
258            "HEXISTS",
259            self.pool.hexists(self.prefixed_key(key), field).await,
260        )
261    }
262
263    pub async fn hget<R, K, F>(&self, key: K, field: F) -> RedisResult<R>
264    where
265        R: FromRedis,
266        K: AsRef<str>,
267        F: Into<RedisKey> + Send,
268    {
269        self.ensure_connected().await?;
270        let start = Instant::now();
271        self.record(
272            start,
273            "HGET",
274            self.pool.hget(self.prefixed_key(key), field).await,
275        )
276    }
277
278    pub async fn hkeys<R, K>(&self, key: K) -> RedisResult<R>
279    where
280        R: FromRedis,
281        K: AsRef<str>,
282    {
283        self.ensure_connected().await?;
284        let start = Instant::now();
285        self.record(
286            start,
287            "HKEYS",
288            self.pool.hkeys(self.prefixed_key(key)).await,
289        )
290    }
291
292    pub async fn hmget<R, K, F>(&self, key: K, fields: F) -> RedisResult<R>
293    where
294        R: FromRedis,
295        K: AsRef<str>,
296        F: Into<MultipleKeys> + Send,
297    {
298        self.ensure_connected().await?;
299        let start = Instant::now();
300        self.record(
301            start,
302            "HMGET",
303            self.pool.hmget(self.prefixed_key(key), fields).await,
304        )
305    }
306
307    pub async fn mset<K, V>(&self, key_values: HashMap<K, V>) -> RedisResult<()>
308    where
309        K: AsRef<str>,
310        V: TryInto<RedisValue> + Send,
311        V::Error: Into<RedisError> + Send,
312    {
313        self.ensure_connected().await?;
314        let start = Instant::now();
315        self.record(
316            start,
317            "MSET",
318            self.pool
319                .mset(
320                    key_values
321                        .into_iter()
322                        .map(|(k, v)| (self.prefixed_key(k), v))
323                        .collect::<Vec<_>>(),
324                )
325                .await,
326        )
327    }
328
329    pub async fn hmset<R, K, V>(&self, key: K, values: V) -> RedisResult<R>
330    where
331        R: FromRedis,
332        K: AsRef<str>,
333        V: TryInto<RedisMap> + Send,
334        V::Error: Into<RedisError> + Send,
335    {
336        self.ensure_connected().await?;
337        let start = Instant::now();
338        self.record(
339            start,
340            "HMSET",
341            self.pool.hmset(self.prefixed_key(key), values).await,
342        )
343    }
344
345    pub async fn hset<R, K, V>(&self, key: K, values: V) -> RedisResult<R>
346    where
347        R: FromRedis,
348        K: AsRef<str>,
349        V: TryInto<RedisMap> + Send,
350        V::Error: Into<RedisError> + Send,
351    {
352        self.ensure_connected().await?;
353        let start = Instant::now();
354        self.record(
355            start,
356            "HSET",
357            self.pool.hset(self.prefixed_key(key), values).await,
358        )
359    }
360
361    pub async fn hsetnx<R, K, F, V>(&self, key: K, field: F, value: V) -> RedisResult<R>
362    where
363        R: FromRedis,
364        K: AsRef<str>,
365        F: Into<RedisKey> + Send,
366        V: TryInto<RedisValue> + Send,
367        V::Error: Into<RedisError> + Send,
368    {
369        self.ensure_connected().await?;
370        let start = Instant::now();
371        self.record(
372            start,
373            "HSETNX",
374            self.pool.hsetnx(self.prefixed_key(key), field, value).await,
375        )
376    }
377
378    pub async fn sadd<R, K, V>(&self, key: K, members: V) -> RedisResult<R>
379    where
380        R: FromRedis,
381        K: AsRef<str>,
382        V: TryInto<MultipleValues> + Send,
383        V::Error: Into<RedisError> + Send,
384    {
385        self.ensure_connected().await?;
386        let start = Instant::now();
387        self.record(
388            start,
389            "SADD",
390            self.pool.sadd(self.prefixed_key(key), members).await,
391        )
392    }
393
394    pub async fn set<R, K, V>(
395        &self,
396        key: K,
397        value: V,
398        expire: Option<Expiration>,
399        options: Option<SetOptions>,
400        get: bool,
401    ) -> RedisResult<R>
402    where
403        R: FromRedis,
404        K: AsRef<str>,
405        V: TryInto<RedisValue> + Send,
406        V::Error: Into<RedisError> + Send,
407    {
408        self.ensure_connected().await?;
409        let start = Instant::now();
410        self.record(
411            start,
412            "SET",
413            self.pool
414                .set(self.prefixed_key(key), value, expire, options, get)
415                .await,
416        )
417    }
418
419    pub async fn smembers<R, K>(&self, key: K) -> RedisResult<R>
420    where
421        R: FromRedis,
422        K: AsRef<str>,
423    {
424        self.ensure_connected().await?;
425        let start = Instant::now();
426        self.record(
427            start,
428            "SMEMBERS",
429            self.pool.smembers(self.prefixed_key(key)).await,
430        )
431    }
432
433    pub async fn srem<R, K, V>(&self, key: K, members: V) -> RedisResult<R>
434    where
435        R: FromRedis,
436        K: AsRef<str>,
437        V: TryInto<MultipleValues> + Send,
438        V::Error: Into<RedisError> + Send,
439    {
440        self.ensure_connected().await?;
441        let start = Instant::now();
442        self.record(
443            start,
444            "SREM",
445            self.pool.srem(self.prefixed_key(key), members).await,
446        )
447    }
448
449    pub async fn scard<R, K>(&self, key: K) -> RedisResult<R>
450    where
451        R: FromRedis,
452        K: AsRef<str>,
453    {
454        self.ensure_connected().await?;
455        let start = Instant::now();
456        self.record(
457            start,
458            "SCARD",
459            self.pool.scard(self.prefixed_key(key)).await,
460        )
461    }
462
463    pub async fn xadd<R, K, C, I, F>(
464        &self,
465        key: K,
466        nomkstream: bool,
467        cap: C,
468        id: I,
469        fields: F,
470    ) -> RedisResult<R>
471    where
472        R: FromRedis,
473        K: AsRef<str>,
474        I: Into<XID> + Send,
475        F: TryInto<MultipleOrderedPairs> + Send,
476        F::Error: Into<RedisError> + Send,
477        C: TryInto<XCap> + Send,
478        C::Error: Into<RedisError> + Send,
479    {
480        self.ensure_connected().await?;
481        let start = Instant::now();
482        self.record(
483            start,
484            "XADD",
485            self.pool
486                .xadd(self.prefixed_key(key), nomkstream, cap, id, fields)
487                .await,
488        )
489    }
490
491    pub async fn xlen<R, K>(&self, key: K) -> RedisResult<R>
492    where
493        R: FromRedis,
494        K: AsRef<str>,
495    {
496        self.ensure_connected().await?;
497        let start = Instant::now();
498        self.record(start, "XLEN", self.pool.xlen(self.prefixed_key(key)).await)
499    }
500
501    pub async fn xrange<R, K, S, E>(
502        &self,
503        key: K,
504        start: S,
505        end: E,
506        count: Option<u64>,
507    ) -> RedisResult<R>
508    where
509        R: FromRedis,
510        K: AsRef<str>,
511        S: TryInto<RedisValue> + Send,
512        S::Error: Into<RedisError> + Send,
513        E: TryInto<RedisValue> + Send,
514        E::Error: Into<RedisError> + Send,
515    {
516        self.ensure_connected().await?;
517        let start_time = Instant::now();
518        self.record(
519            start_time,
520            "XRANGE",
521            self.pool
522                .xrange(self.prefixed_key(key), start, end, count)
523                .await,
524        )
525    }
526
527    pub async fn xrevrange<R, K, S, E>(
528        &self,
529        key: K,
530        end: E,
531        start: S,
532        count: Option<u64>,
533    ) -> RedisResult<R>
534    where
535        R: FromRedis,
536        K: AsRef<str>,
537        S: TryInto<RedisValue> + Send,
538        S::Error: Into<RedisError> + Send,
539        E: TryInto<RedisValue> + Send,
540        E::Error: Into<RedisError> + Send,
541    {
542        self.ensure_connected().await?;
543        let start_time = Instant::now();
544        self.record(
545            start_time,
546            "XREVRANGE",
547            self.pool
548                .xrevrange(self.prefixed_key(key), end, start, count)
549                .await,
550        )
551    }
552
553    pub async fn xtrim<R, K, C>(&self, key: K, cap: C) -> RedisResult<R>
554    where
555        R: FromRedis,
556        K: AsRef<str>,
557        C: TryInto<XCap> + Send,
558        C::Error: Into<RedisError> + Send,
559    {
560        self.ensure_connected().await?;
561        let start = Instant::now();
562        self.record(
563            start,
564            "XTRIM",
565            self.pool.xtrim(self.prefixed_key(key), cap).await,
566        )
567    }
568
569    pub async fn zadd<R, K, V>(
570        &self,
571        key: K,
572        options: Option<SetOptions>,
573        ordering: Option<Ordering>,
574        changed: bool,
575        incr: bool,
576        values: V,
577    ) -> RedisResult<R>
578    where
579        R: FromRedis,
580        K: AsRef<str>,
581        V: TryInto<MultipleZaddValues> + Send,
582        V::Error: Into<RedisError> + Send,
583    {
584        self.ensure_connected().await?;
585        let start = Instant::now();
586        self.record(
587            start,
588            "ZADD",
589            self.pool
590                .zadd(
591                    self.prefixed_key(key),
592                    options,
593                    ordering,
594                    changed,
595                    incr,
596                    values,
597                )
598                .await,
599        )
600    }
601
602    pub async fn zrange<R, K, M, N>(
603        &self,
604        key: K,
605        min: M,
606        max: N,
607        sort: Option<ZSort>,
608        rev: bool,
609        limit: Option<Limit>,
610        withscores: bool,
611    ) -> RedisResult<R>
612    where
613        R: FromRedis,
614        K: AsRef<str>,
615        M: TryInto<ZRange> + Send,
616        M::Error: Into<RedisError> + Send,
617        N: TryInto<ZRange> + Send,
618        N::Error: Into<RedisError> + Send,
619    {
620        self.ensure_connected().await?;
621        let start = Instant::now();
622        self.record(
623            start,
624            "ZRANGE",
625            self.pool
626                .zrange(
627                    self.prefixed_key(key),
628                    min,
629                    max,
630                    sort,
631                    rev,
632                    limit,
633                    withscores,
634                )
635                .await,
636        )
637    }
638
639    pub async fn zrangebyscore<R, K, M, N>(
640        &self,
641        key: K,
642        min: M,
643        max: N,
644        withscores: bool,
645        limit: Option<Limit>,
646    ) -> RedisResult<R>
647    where
648        R: FromRedis,
649        K: AsRef<str>,
650        M: TryInto<ZRange> + Send,
651        M::Error: Into<RedisError> + Send,
652        N: TryInto<ZRange> + Send,
653        N::Error: Into<RedisError> + Send,
654    {
655        self.ensure_connected().await?;
656        let start = Instant::now();
657        self.record(
658            start,
659            "ZRANGEBYSCORE",
660            self.pool
661                .zrangebyscore(self.prefixed_key(key), min, max, withscores, limit)
662                .await,
663        )
664    }
665
666    pub async fn zrem<R, K, V>(&self, key: K, members: V) -> RedisResult<R>
667    where
668        R: FromRedis,
669        K: AsRef<str>,
670        V: TryInto<MultipleValues> + Send,
671        V::Error: Into<RedisError> + Send,
672    {
673        self.ensure_connected().await?;
674        let start = Instant::now();
675        self.record(
676            start,
677            "ZREM",
678            self.pool.zrem(self.prefixed_key(key), members).await,
679        )
680    }
681
682    pub async fn transaction<R, F, Fu>(&self, func: F) -> RedisResult<R>
683    where
684        R: FromRedis,
685        F: FnOnce(RedisTransaction) -> Fu,
686        Fu: std::future::Future<Output = RedisResult<RedisTransaction>>,
687    {
688        self.ensure_connected().await?;
689        let start = Instant::now();
690
691        let client = self.pool.next_connected();
692        let trx = client.multi();
693        let trx = RedisTransaction::new(trx, self.key_prefix.clone());
694        let trx = func(trx).await?;
695
696        self.record(start, "MULTI", trx.trx.exec(true).await)
697    }
698
699    pub async fn wait(&self, replicas: i64, timeout: i64) -> RedisResult<i64> {
700        self.ensure_connected().await?;
701        let start = Instant::now();
702        self.record(start, "WAIT", self.pool.wait(replicas, timeout).await)
703    }
704
705    pub async fn info_connected_slaves(&self) -> RedisResult<u8> {
706        self.ensure_connected().await?;
707        let start = Instant::now();
708        let info: String = self.record(
709            start,
710            "INFO",
711            self.pool.info(Some(InfoKind::Replication)).await,
712        )?;
713        let info: HashMap<&str, &str> =
714            HashMap::from_iter(info.lines().filter_map(|line| line.trim().split_once(':')));
715        debug!(info = format!("{:?}", info), "Redis replication info");
716        let connected_slaves = info
717            .get("connected_slaves")
718            .and_then(|s| s.parse().ok())
719            .unwrap_or(0);
720        Ok(connected_slaves)
721    }
722
723    pub async fn scan<K>(
724        &self,
725        pattern: K,
726        cursor: u64,
727        count: u64,
728    ) -> RedisResult<(u64, Vec<String>)>
729    where
730        K: AsRef<str>,
731    {
732        self.ensure_connected().await?;
733        let start = Instant::now();
734
735        //https://redis.io/commands/scan/
736        let args: Vec<String> = vec![
737            cursor.to_string(),
738            "MATCH".to_string(),
739            self.prefixed_key(pattern),
740            "COUNT".to_string(),
741            count.to_string(),
742        ];
743
744        //https://github.com/aembke/fred.rs/blob/3a91ee9bc12faff9d32627c0db2c9b24c54efa03/examples/custom.rs#L7
745
746        self.record(
747            start,
748            "SCAN",
749            self.pool
750                .next()
751                .custom_raw(cmd!("SCAN"), args)
752                .await
753                .and_then(|f| self.parse_key_scan_frame(f)),
754        )
755    }
756
757    pub async fn keys<K>(&self, pattern: K) -> RedisResult<Vec<String>>
758    where
759        K: AsRef<str>,
760    {
761        self.ensure_connected().await?;
762        let start = Instant::now();
763
764        //https://redis.io/commands/keys/
765        let args: Vec<String> = vec![self.prefixed_key(pattern)];
766
767        //https://github.com/aembke/fred.rs/blob/3a91ee9bc12faff9d32627c0db2c9b24c54efa03/examples/custom.rs#L7
768
769        self.record(
770            start,
771            "KEYS",
772            self.pool
773                .next()
774                .custom_raw(cmd!("KEYS"), args)
775                .await
776                .and_then(|f| f.try_into())
777                .and_then(|v: RedisValue| v.convert::<Vec<String>>())
778                .map(|keys| {
779                    keys.into_iter()
780                        .map(|key| key[self.key_prefix.len()..].to_string())
781                        .collect()
782                }),
783        )
784    }
785
786    fn parse_key_scan_frame(&self, frame: Resp3Frame) -> RedisResult<(u64, Vec<String>)> {
787        use fred::prelude::*;
788        if let Resp3Frame::Array { mut data, .. } = frame {
789            if data.len() == 2 {
790                let cursor: u64 = data[0]
791                    .clone()
792                    .try_into()
793                    .and_then(|value: RedisValue| value.convert())?;
794
795                if let Some(Resp3Frame::Array { data, .. }) = data.pop() {
796                    let mut keys = Vec::with_capacity(data.len());
797
798                    let key_prefix_len = self.key_prefix.len();
799
800                    for frame in data.into_iter() {
801                        let key: String = frame
802                            .try_into()
803                            .and_then(|value: RedisValue| value.convert())?;
804
805                        if key_prefix_len > 0 {
806                            keys.push(key[key_prefix_len..].to_string());
807                        } else {
808                            keys.push(key);
809                        }
810                    }
811
812                    Ok((cursor, keys))
813                } else {
814                    Err(RedisError::new(
815                        RedisErrorKind::Protocol,
816                        "Expected second SCAN result element to be an array.",
817                    ))
818                }
819            } else {
820                Err(RedisError::new(
821                    RedisErrorKind::Protocol,
822                    "Expected two-element bulk string array from SCAN.",
823                ))
824            }
825        } else {
826            Err(RedisError::new(
827                RedisErrorKind::Protocol,
828                "Expected bulk string array from SCAN.",
829            ))
830        }
831    }
832}
833
834pub struct RedisTransaction {
835    trx: Transaction,
836    key_prefix: String,
837}
838
839impl RedisTransaction {
840    fn new(trx: Transaction, key_prefix: String) -> Self {
841        Self { trx, key_prefix }
842    }
843
844    fn prefixed_key<K>(&self, key: K) -> String
845    where
846        K: AsRef<str>,
847    {
848        format!("{}{}", &self.key_prefix, key.as_ref())
849    }
850
851    pub async fn del<K>(&self, key: K) -> RedisResult<()>
852    where
853        K: AsRef<str>,
854    {
855        self.trx.del(self.prefixed_key(key)).await
856    }
857
858    pub async fn set<K, V>(
859        &self,
860        key: K,
861        value: V,
862        expire: Option<Expiration>,
863        options: Option<SetOptions>,
864        get: bool,
865    ) -> RedisResult<()>
866    where
867        K: AsRef<str>,
868        V: TryInto<RedisValue> + Send,
869        V::Error: Into<RedisError> + Send,
870    {
871        self.trx
872            .set(self.prefixed_key(key), value, expire, options, get)
873            .await
874    }
875
876    pub async fn sadd<K, V>(&self, key: K, members: V) -> RedisResult<()>
877    where
878        K: AsRef<str>,
879        V: TryInto<MultipleValues> + Send,
880        V::Error: Into<RedisError> + Send,
881    {
882        self.trx.sadd(self.prefixed_key(key), members).await
883    }
884
885    pub async fn srem<K, V>(&self, key: K, members: V) -> RedisResult<()>
886    where
887        K: AsRef<str>,
888        V: TryInto<MultipleValues> + Send,
889        V::Error: Into<RedisError> + Send,
890    {
891        self.trx.srem(self.prefixed_key(key), members).await
892    }
893
894    pub async fn scard<K>(&self, key: K) -> RedisResult<()>
895    where
896        K: AsRef<str>,
897    {
898        self.trx.scard(self.prefixed_key(key)).await
899    }
900}