simple_redis_wrapper/client/
redis_async_client.rs

1use crate::client::types::{EvictionPolicy, Key, Namespace, Prefix};
2use anyhow::anyhow;
3use redis::aio::ConnectionManager;
4use redis::{AsyncCommands, cmd};
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use std::env;
8
9pub struct RedisAsyncClient {
10    pub url: String,
11    pub connection: ConnectionManager,
12    pub namespace: Namespace,
13}
14
15impl Clone for RedisAsyncClient {
16    fn clone(&self) -> Self {
17        Self {
18            url: self.url.clone(),
19            connection: self.connection.clone(),
20            namespace: self.namespace.clone(),
21        }
22    }
23}
24
25impl RedisAsyncClient {
26    pub async fn new(url: Option<String>, namespace: Namespace) -> anyhow::Result<Self> {
27        let url = url.unwrap_or(env::var("REDIS_URL")?);
28        let url = if url.ends_with("#insecure") {
29            url
30        } else {
31            format!("{}#insecure", url)
32        };
33        let client = redis::Client::open(url.clone())?;
34        let connection = ConnectionManager::new(client).await?;
35        // let connection = client.get_multiplexed_async_connection().await?;
36        Ok(Self {
37            url,
38            connection,
39            namespace,
40        })
41    }
42
43    pub async fn set_eviction_policy(
44        &self,
45        eviction_policy: EvictionPolicy,
46    ) -> anyhow::Result<String> {
47        let _: () = cmd("CONFIG")
48            .arg("SET")
49            .arg("maxmemory-policy")
50            .arg(eviction_policy.to_string())
51            .query_async(&mut self.connection())
52            .await?;
53        self.get_eviction_policy().await
54    }
55
56    pub async fn get_eviction_policy(&self) -> anyhow::Result<String> {
57        let current_policy: Vec<String> = cmd("CONFIG")
58            .arg("GET")
59            .arg("maxmemory-policy")
60            .query_async(&mut self.connection())
61            .await?;
62        Ok(current_policy.join(""))
63    }
64
65    pub fn key(&self, prefix: &Prefix, key: &Key) -> String {
66        format!("{}:{}:{}", self.namespace.0, prefix.0, key.0)
67    }
68
69    pub fn connection(&self) -> ConnectionManager {
70        self.connection.clone()
71    }
72
73    pub async fn get(&self, key: &str) -> anyhow::Result<Option<String>> {
74        let redis_str: Option<String> = AsyncCommands::get(&mut self.connection(), key).await?;
75        Ok(redis_str)
76    }
77
78    pub async fn set_ex(&self, key: &str, value: &str, expiry: Option<u64>) -> anyhow::Result<()> {
79        match expiry {
80            Some(expiry) => {
81                let _: () =
82                    AsyncCommands::set_ex(&mut self.connection(), key, value, expiry).await?;
83            }
84            None => {
85                let _: () = AsyncCommands::set(&mut self.connection(), key, value).await?;
86            }
87        }
88        Ok(())
89    }
90
91    pub async fn get_all(&self) -> anyhow::Result<Vec<(String, String)>> {
92        let mut output: Vec<(String, String)> = Vec::new();
93        let keys: Vec<String> = AsyncCommands::keys(&mut self.connection(), "*").await?;
94        for key in keys {
95            if let Some(value) = self.get(&key).await? {
96                output.push((key, value))
97            }
98        }
99        Ok(output)
100    }
101
102    pub async fn remove(&self, key: &str) -> anyhow::Result<()> {
103        let _: () = AsyncCommands::del(&mut self.connection(), key).await?;
104        Ok(())
105    }
106
107    pub async fn get_entity<T>(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<T>
108    where
109        T: DeserializeOwned + Serialize,
110    {
111        let redis_str: Option<String> =
112            AsyncCommands::get(&mut self.connection(), self.key(prefix, key)).await?;
113        match redis_str {
114            Some(string) => {
115                let redis_entity: T = serde_json::from_str(&string)
116                    .map_err(|e| anyhow!("get_entity serde_json error: {}", e))?;
117                Ok(redis_entity)
118            }
119            None => Err(anyhow!("Didn't find entity")),
120        }
121    }
122
123    pub async fn save_entity<T>(
124        &self,
125        prefix: &Prefix,
126        key: &Key,
127        value: &T,
128        expiry: Option<u64>,
129    ) -> anyhow::Result<()>
130    where
131        T: DeserializeOwned + Serialize,
132    {
133        let value_str = serde_json::to_string(&value)
134            .map_err(|e| anyhow!("save_entity serde_json error: {}", e))?;
135        match expiry {
136            Some(expiry) => {
137                let _: () = AsyncCommands::set_ex(
138                    &mut self.connection(),
139                    self.key(prefix, key),
140                    value_str,
141                    expiry,
142                )
143                .await?;
144            }
145            None => {
146                let _: () =
147                    AsyncCommands::set(&mut self.connection(), self.key(prefix, key), value_str)
148                        .await?;
149            }
150        }
151        Ok(())
152    }
153
154    pub async fn remove_entity<T>(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<()> {
155        let _: () = AsyncCommands::del(&mut self.connection(), self.key(prefix, key)).await?;
156        Ok(())
157    }
158}