simple_redis_wrapper/client/
redis_async_client.rs

1use crate::client::types::{EvictionPolicy, Key, Namespace, Prefix};
2use anyhow::anyhow;
3use redis::aio::ConnectionManager;
4use redis::{AsyncCommands, cmd};
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use std::env;
8
9pub struct RedisAsyncClient {
10    pub url: String,
11    pub connection: ConnectionManager,
12    pub namespace: Namespace,
13}
14
15impl Clone for RedisAsyncClient {
16    fn clone(&self) -> Self {
17        Self {
18            url: self.url.clone(),
19            connection: self.connection.clone(),
20            namespace: self.namespace.clone(),
21        }
22    }
23}
24
25impl RedisAsyncClient {
26    pub async fn new(url: Option<String>, namespace: Namespace) -> anyhow::Result<Self> {
27        let url = url.unwrap_or(env::var("REDIS_URL")?);
28        let url = if url.ends_with("#insecure") {
29            url
30        } else {
31            format!("{}#insecure", url)
32        };
33        let client = redis::Client::open(url.clone())?;
34        let connection = ConnectionManager::new(client).await?;
35        // let connection = client.get_multiplexed_async_connection().await?;
36        Ok(Self {
37            url,
38            connection,
39            namespace,
40        })
41    }
42
43    pub async fn set_eviction_policy(
44        &self,
45        eviction_policy: EvictionPolicy,
46    ) -> anyhow::Result<String> {
47        let _: () = cmd("CONFIG")
48            .arg("SET")
49            .arg("maxmemory-policy")
50            .arg(eviction_policy.to_string())
51            .query_async(&mut self.connection())
52            .await?;
53        self.get_eviction_policy().await
54    }
55
56    pub async fn get_eviction_policy(&self) -> anyhow::Result<String> {
57        let current_policy: Vec<String> = cmd("CONFIG")
58            .arg("GET")
59            .arg("maxmemory-policy")
60            .query_async(&mut self.connection())
61            .await?;
62        Ok(current_policy.join(""))
63    }
64
65    pub fn key(&self, prefix: &Prefix, key: &Key) -> String {
66        format!("{}:{}:{}", self.namespace.0, prefix.0, key.0)
67    }
68
69    pub fn connection(&self) -> ConnectionManager {
70        self.connection.clone()
71    }
72
73    pub async fn get(&self, key: &str) -> anyhow::Result<Option<String>> {
74        let redis_str: Option<String> = AsyncCommands::get(&mut self.connection(), key).await?;
75        Ok(redis_str)
76    }
77
78    pub async fn set_ex(&self, key: &str, value: &str, expiry: Option<u64>) -> anyhow::Result<()> {
79        match expiry {
80            Some(expiry) => {
81                let _: () =
82                    AsyncCommands::set_ex(&mut self.connection(), key, value, expiry).await?;
83            }
84            None => {
85                let _: () = AsyncCommands::set(&mut self.connection(), key, value).await?;
86            }
87        }
88        Ok(())
89    }
90
91    pub async fn remove(&self, key: &str) -> anyhow::Result<()> {
92        let _: () = AsyncCommands::del(&mut self.connection(), key).await?;
93        Ok(())
94    }
95
96    pub async fn get_entity<T>(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<T>
97    where
98        T: DeserializeOwned + Serialize,
99    {
100        let redis_str: Option<String> =
101            AsyncCommands::get(&mut self.connection(), self.key(prefix, key)).await?;
102        match redis_str {
103            Some(string) => {
104                let redis_entity: T = serde_json::from_str(&string)
105                    .map_err(|e| anyhow!("get_entity serde_json error: {}", e))?;
106                Ok(redis_entity)
107            }
108            None => Err(anyhow!("Didn't find entity")),
109        }
110    }
111
112    pub async fn save_entity<T>(
113        &self,
114        prefix: &Prefix,
115        key: &Key,
116        value: &T,
117        expiry: Option<u64>,
118    ) -> anyhow::Result<()>
119    where
120        T: DeserializeOwned + Serialize,
121    {
122        let value_str = serde_json::to_string(&value)
123            .map_err(|e| anyhow!("save_entity serde_json error: {}", e))?;
124        match expiry {
125            Some(expiry) => {
126                let _: () = AsyncCommands::set_ex(
127                    &mut self.connection(),
128                    self.key(prefix, key),
129                    value_str,
130                    expiry,
131                )
132                .await?;
133            }
134            None => {
135                let _: () =
136                    AsyncCommands::set(&mut self.connection(), self.key(prefix, key), value_str)
137                        .await?;
138            }
139        }
140        Ok(())
141    }
142
143    pub async fn remove_entity(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<()> {
144        let _: () = AsyncCommands::del(&mut self.connection(), self.key(prefix, key)).await?;
145        Ok(())
146    }
147}