simple_redis_wrapper/client/
redis_async_client.rs

1use crate::client::types::{EvictionPolicy, Key, Namespace, Prefix};
2use anyhow::anyhow;
3use redis::{AsyncCommands, cmd};
4use serde::Serialize;
5use serde::de::DeserializeOwned;
6use std::env;
7
8pub struct RedisAsyncClient {
9    pub url: String,
10    pub connection: redis::aio::MultiplexedConnection,
11    pub namespace: Namespace,
12}
13
14impl Clone for RedisAsyncClient {
15    fn clone(&self) -> Self {
16        Self {
17            url: self.url.clone(),
18            connection: self.connection.clone(),
19            namespace: self.namespace.clone(),
20        }
21    }
22}
23
24impl RedisAsyncClient {
25    pub async fn new(url: Option<String>, namespace: Namespace) -> anyhow::Result<Self> {
26        let url = url.unwrap_or(env::var("REDIS_URL")?);
27        let client = redis::Client::open(url.clone())?;
28        let connection = client.get_multiplexed_async_connection().await?;
29        Ok(Self {
30            url,
31            connection,
32            namespace,
33        })
34    }
35
36    pub async fn set_eviction_policy(
37        &self,
38        eviction_policy: EvictionPolicy,
39    ) -> anyhow::Result<String> {
40        let _: () = cmd("CONFIG")
41            .arg("SET")
42            .arg("maxmemory-policy")
43            .arg(eviction_policy.to_string())
44            .query_async(&mut self.connection())
45            .await?;
46        self.get_eviction_policy().await
47    }
48
49    pub async fn get_eviction_policy(&self) -> anyhow::Result<String> {
50        let current_policy: Vec<String> = cmd("CONFIG")
51            .arg("GET")
52            .arg("maxmemory-policy")
53            .query_async(&mut self.connection())
54            .await?;
55        Ok(current_policy.join(""))
56    }
57
58    pub fn key(&self, prefix: &Prefix, key: &Key) -> String {
59        format!("{}:{}:{}", self.namespace.0, prefix.0, key.0)
60    }
61
62    pub fn connection(&self) -> redis::aio::MultiplexedConnection {
63        self.connection.clone()
64    }
65
66    pub async fn get_entity<T>(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<T>
67    where
68        T: DeserializeOwned + Serialize,
69    {
70        let redis_str: Option<String> = self.connection().get(self.key(prefix, key)).await?;
71        match redis_str {
72            Some(string) => {
73                let redis_entity: T = serde_json::from_str(&string)
74                    .map_err(|e| anyhow!("get_entity serde_json error: {}", e))?;
75                Ok(redis_entity)
76            }
77            None => Err(anyhow!("Didn't find entity")),
78        }
79    }
80
81    pub async fn save_entity<T>(
82        &self,
83        prefix: &Prefix,
84        key: &Key,
85        value: &T,
86        expiry: Option<u64>,
87    ) -> anyhow::Result<()>
88    where
89        T: DeserializeOwned + Serialize,
90    {
91        let value_str = serde_json::to_string(&value)
92            .map_err(|e| anyhow!("save_entity serde_json error: {}", e))?;
93        match expiry {
94            Some(expiry) => {
95                let _: () = self
96                    .connection()
97                    .set_ex(self.key(prefix, key), value_str, expiry)
98                    .await?;
99            }
100            None => {
101                let _: () = self
102                    .connection()
103                    .set(self.key(prefix, key), value_str)
104                    .await?;
105            }
106        }
107        Ok(())
108    }
109
110    pub async fn remove_entity(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<()> {
111        let _: () = self.connection().del(self.key(prefix, key)).await?;
112        Ok(())
113    }
114}