actix_cloud/memorydb/
redis.rs

1use std::time::Duration;
2
3use async_trait::async_trait;
4use redis::{aio::ConnectionManager, AsyncCommands, Expiry};
5
6use super::interface::MemoryDB;
7use crate::Result;
8
9#[derive(Clone)]
10pub struct RedisBackend {
11    client: ConnectionManager,
12}
13
14impl RedisBackend {
15    pub async fn new(dsn: &str) -> Result<Self> {
16        let client = ConnectionManager::new(redis::Client::open(dsn)?).await?;
17        Ok(Self { client })
18    }
19}
20
21#[async_trait]
22impl MemoryDB for RedisBackend {
23    async fn set(&self, key: &str, value: &str) -> Result<()> {
24        self.client
25            .clone()
26            .set(key, value)
27            .await
28            .map_err(Into::into)
29    }
30
31    async fn get(&self, key: &str) -> Result<Option<String>> {
32        self.client.clone().get(key).await.map_err(Into::into)
33    }
34
35    async fn get_del(&self, key: &str) -> Result<Option<String>> {
36        self.client.clone().get_del(key).await.map_err(Into::into)
37    }
38
39    async fn get_ex(&self, key: &str, ttl: &Duration) -> Result<Option<String>> {
40        self.client
41            .clone()
42            .get_ex(key, Expiry::EX(ttl.as_secs()))
43            .await
44            .map_err(Into::into)
45    }
46
47    async fn set_ex(&self, key: &str, value: &str, ttl: &Duration) -> Result<()> {
48        self.client
49            .clone()
50            .set_ex(key, value, ttl.as_secs())
51            .await
52            .map_err(Into::into)
53    }
54
55    async fn del(&self, key: &str) -> Result<bool> {
56        self.client.clone().del(key).await.map_err(Into::into)
57    }
58
59    async fn expire(&self, key: &str, ttl: i64) -> Result<bool> {
60        self.client
61            .clone()
62            .expire(key, ttl)
63            .await
64            .map_err(Into::into)
65    }
66
67    async fn flush(&self) -> Result<()> {
68        redis::cmd("FLUSHDB")
69            .query_async(&mut self.client.clone())
70            .await
71            .map_err(Into::into)
72    }
73
74    async fn keys(&self, key: &str) -> Result<Vec<String>> {
75        self.client.clone().keys(key).await.map_err(Into::into)
76    }
77
78    async fn dels(&self, keys: &[String]) -> Result<u64> {
79        let mut p = redis::pipe();
80        let mut p = p.atomic();
81        for i in keys {
82            p = p.del(i);
83        }
84        let res: Vec<u64> = p.query_async(&mut self.client.clone()).await?;
85        Ok(res.into_iter().sum())
86    }
87
88    async fn ttl(&self, key: &str) -> Result<Option<i64>> {
89        let ret: i64 = self.client.clone().ttl(key).await?;
90        if ret <= 0 {
91            Ok(None)
92        } else {
93            Ok(Some(ret))
94        }
95    }
96}