simple_redis_wrapper/client/
redis_async_client.rs

1use crate::client::types::{EvictionPolicy, Key, Namespace, Prefix};
2use anyhow::anyhow;
3use redis::{AsyncCommands, cmd};
4use serde::Serialize;
5use serde::de::DeserializeOwned;
6use std::env;
7
8pub struct RedisAsyncClient {
9    pub url: String,
10    pub connection: redis::aio::MultiplexedConnection,
11    pub namespace: Namespace,
12}
13
14impl Clone for RedisAsyncClient {
15    fn clone(&self) -> Self {
16        Self {
17            url: self.url.clone(),
18            connection: self.connection.clone(),
19            namespace: self.namespace.clone(),
20        }
21    }
22}
23
24impl RedisAsyncClient {
25    pub async fn new(url: Option<String>, namespace: Namespace) -> anyhow::Result<Self> {
26        let url = url.unwrap_or(env::var("REDIS_URL")?);
27        let url = if url.ends_with("#insecure") {
28            url
29        } else {
30            format!("{}#insecure", url)
31        };
32        let client = redis::Client::open(url.clone())?;
33        let connection = client.get_multiplexed_async_connection().await?;
34        Ok(Self {
35            url,
36            connection,
37            namespace,
38        })
39    }
40
41    pub async fn set_eviction_policy(
42        &self,
43        eviction_policy: EvictionPolicy,
44    ) -> anyhow::Result<String> {
45        let _: () = cmd("CONFIG")
46            .arg("SET")
47            .arg("maxmemory-policy")
48            .arg(eviction_policy.to_string())
49            .query_async(&mut self.connection())
50            .await?;
51        self.get_eviction_policy().await
52    }
53
54    pub async fn get_eviction_policy(&self) -> anyhow::Result<String> {
55        let current_policy: Vec<String> = cmd("CONFIG")
56            .arg("GET")
57            .arg("maxmemory-policy")
58            .query_async(&mut self.connection())
59            .await?;
60        Ok(current_policy.join(""))
61    }
62
63    pub fn key(&self, prefix: &Prefix, key: &Key) -> String {
64        format!("{}:{}:{}", self.namespace.0, prefix.0, key.0)
65    }
66
67    pub fn connection(&self) -> redis::aio::MultiplexedConnection {
68        self.connection.clone()
69    }
70
71    pub async fn get_entity<T>(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<T>
72    where
73        T: DeserializeOwned + Serialize,
74    {
75        let redis_str: Option<String> = self.connection().get(self.key(prefix, key)).await?;
76        match redis_str {
77            Some(string) => {
78                let redis_entity: T = serde_json::from_str(&string)
79                    .map_err(|e| anyhow!("get_entity serde_json error: {}", e))?;
80                Ok(redis_entity)
81            }
82            None => Err(anyhow!("Didn't find entity")),
83        }
84    }
85
86    pub async fn save_entity<T>(
87        &self,
88        prefix: &Prefix,
89        key: &Key,
90        value: &T,
91        expiry: Option<u64>,
92    ) -> anyhow::Result<()>
93    where
94        T: DeserializeOwned + Serialize,
95    {
96        let value_str = serde_json::to_string(&value)
97            .map_err(|e| anyhow!("save_entity serde_json error: {}", e))?;
98        match expiry {
99            Some(expiry) => {
100                let _: () = self
101                    .connection()
102                    .set_ex(self.key(prefix, key), value_str, expiry)
103                    .await?;
104            }
105            None => {
106                let _: () = self
107                    .connection()
108                    .set(self.key(prefix, key), value_str)
109                    .await?;
110            }
111        }
112        Ok(())
113    }
114
115    pub async fn remove_entity(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<()> {
116        let _: () = self.connection().del(self.key(prefix, key)).await?;
117        Ok(())
118    }
119}