simple_redis_wrapper/client/
redis_async_client.rs

1use crate::client::types::{EvictionPolicy, Key, Namespace, Prefix};
2use anyhow::anyhow;
3use redis::aio::ConnectionManager;
4use redis::{AsyncCommands, cmd};
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use std::env;
8
9pub struct RedisAsyncClient {
10    pub url: String,
11    pub connection: ConnectionManager,
12    pub namespace: Namespace,
13}
14
15impl Clone for RedisAsyncClient {
16    fn clone(&self) -> Self {
17        Self {
18            url: self.url.clone(),
19            connection: self.connection.clone(),
20            namespace: self.namespace.clone(),
21        }
22    }
23}
24
25impl RedisAsyncClient {
26    pub async fn new(url: Option<String>, namespace: Namespace) -> anyhow::Result<Self> {
27        let url = url.unwrap_or(env::var("REDIS_URL")?);
28        let url = if url.ends_with("#insecure") {
29            url
30        } else {
31            format!("{}#insecure", url)
32        };
33        let client = redis::Client::open(url.clone())?;
34        let connection = ConnectionManager::new(client).await?;
35        // let connection = client.get_multiplexed_async_connection().await?;
36        Ok(Self {
37            url,
38            connection,
39            namespace,
40        })
41    }
42
43    pub async fn set_eviction_policy(
44        &self,
45        eviction_policy: EvictionPolicy,
46    ) -> anyhow::Result<String> {
47        let _: () = cmd("CONFIG")
48            .arg("SET")
49            .arg("maxmemory-policy")
50            .arg(eviction_policy.to_string())
51            .query_async(&mut self.connection())
52            .await?;
53        self.get_eviction_policy().await
54    }
55
56    pub async fn get_eviction_policy(&self) -> anyhow::Result<String> {
57        let current_policy: Vec<String> = cmd("CONFIG")
58            .arg("GET")
59            .arg("maxmemory-policy")
60            .query_async(&mut self.connection())
61            .await?;
62        Ok(current_policy.join(""))
63    }
64
65    pub fn key(&self, prefix: &Prefix, key: &Key) -> String {
66        format!("{}:{}:{}", self.namespace.0, prefix.0, key.0)
67    }
68
69    pub fn connection(&self) -> ConnectionManager {
70        self.connection.clone()
71    }
72
73    pub async fn get(&self, key: &str) -> anyhow::Result<Option<String>> {
74        let redis_str: Option<String> = AsyncCommands::get(&mut self.connection(), key).await?;
75        Ok(redis_str)
76    }
77
78    pub async fn set_ex(&self, key: &str, value: &str, expiry: Option<u64>) -> anyhow::Result<()> {
79        match expiry {
80            Some(expiry) => {
81                let _: () =
82                    AsyncCommands::set_ex(&mut self.connection(), key, value, expiry).await?;
83            }
84            None => {
85                let _: () = AsyncCommands::set(&mut self.connection(), key, value).await?;
86            }
87        }
88        Ok(())
89    }
90
91    pub async fn get_all(&self) -> anyhow::Result<Vec<(String, String)>> {
92        let mut output: Vec<(String, String)> = Vec::new();
93        let keys: Vec<String> = AsyncCommands::keys(&mut self.connection(), "*").await?;
94        for key in keys {
95            match self.get(&key).await? {
96                Some(value) => output.push((key, value)),
97                None => {}
98            }
99        }
100        Ok(output)
101    }
102
103    pub async fn remove(&self, key: &str) -> anyhow::Result<()> {
104        let _: () = AsyncCommands::del(&mut self.connection(), key).await?;
105        Ok(())
106    }
107
108    pub async fn get_entity<T>(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<T>
109    where
110        T: DeserializeOwned + Serialize,
111    {
112        let redis_str: Option<String> =
113            AsyncCommands::get(&mut self.connection(), self.key(prefix, key)).await?;
114        match redis_str {
115            Some(string) => {
116                let redis_entity: T = serde_json::from_str(&string)
117                    .map_err(|e| anyhow!("get_entity serde_json error: {}", e))?;
118                Ok(redis_entity)
119            }
120            None => Err(anyhow!("Didn't find entity")),
121        }
122    }
123
124    pub async fn save_entity<T>(
125        &self,
126        prefix: &Prefix,
127        key: &Key,
128        value: &T,
129        expiry: Option<u64>,
130    ) -> anyhow::Result<()>
131    where
132        T: DeserializeOwned + Serialize,
133    {
134        let value_str = serde_json::to_string(&value)
135            .map_err(|e| anyhow!("save_entity serde_json error: {}", e))?;
136        match expiry {
137            Some(expiry) => {
138                let _: () = AsyncCommands::set_ex(
139                    &mut self.connection(),
140                    self.key(prefix, key),
141                    value_str,
142                    expiry,
143                )
144                .await?;
145            }
146            None => {
147                let _: () =
148                    AsyncCommands::set(&mut self.connection(), self.key(prefix, key), value_str)
149                        .await?;
150            }
151        }
152        Ok(())
153    }
154
155    pub async fn remove_entity(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<()> {
156        let _: () = AsyncCommands::del(&mut self.connection(), self.key(prefix, key)).await?;
157        Ok(())
158    }
159}