simple_redis_wrapper/client/
redis_async_client.rs1use crate::client::types::{EvictionPolicy, Key, Namespace, Prefix};
2use anyhow::anyhow;
3use redis::{AsyncCommands, cmd};
4use serde::Serialize;
5use serde::de::DeserializeOwned;
6use std::env;
7
8pub struct RedisAsyncClient {
9 pub url: String,
10 pub client: redis::Client,
11 pub connection: redis::aio::MultiplexedConnection,
12 pub namespace: Namespace,
13}
14
15impl RedisAsyncClient {
16 pub async fn new(url: Option<String>, namespace: Namespace) -> anyhow::Result<Self> {
17 let url = url.unwrap_or(env::var("REDIS_URL")?);
18 let client = redis::Client::open(url.clone())?;
19 let connection = client.get_multiplexed_async_connection().await?;
20 Ok(Self {
21 url,
22 client,
23 connection,
24 namespace,
25 })
26 }
27
28 pub async fn set_eviction_policy(
29 &self,
30 eviction_policy: EvictionPolicy,
31 ) -> anyhow::Result<String> {
32 let _: () = cmd("CONFIG")
33 .arg("SET")
34 .arg("maxmemory-policy")
35 .arg(eviction_policy.to_string())
36 .query_async(&mut self.connection())
37 .await?;
38 self.get_eviction_policy().await
39 }
40
41 pub async fn get_eviction_policy(&self) -> anyhow::Result<String> {
42 let current_policy: Vec<String> = cmd("CONFIG")
43 .arg("GET")
44 .arg("maxmemory-policy")
45 .query_async(&mut self.connection())
46 .await?;
47 Ok(current_policy.join(""))
48 }
49
50 pub fn key(&self, prefix: &Prefix, key: &Key) -> String {
51 format!("{}:{}:{}", self.namespace.0, prefix.0, key.0)
52 }
53
54 pub fn connection(&self) -> redis::aio::MultiplexedConnection {
55 self.connection.clone()
56 }
57
58 pub async fn get_entity<T>(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<T>
59 where
60 T: DeserializeOwned + Serialize,
61 {
62 let redis_str: Option<String> = self.connection().get(self.key(prefix, key)).await?;
63 match redis_str {
64 Some(string) => {
65 let redis_entity: T = serde_json::from_str(&string)
66 .map_err(|e| anyhow!("get_entity serde_json error: {}", e))?;
67 Ok(redis_entity)
68 }
69 None => Err(anyhow!("Didn't find entity")),
70 }
71 }
72
73 pub async fn save_entity<T>(
74 &self,
75 prefix: &Prefix,
76 key: &Key,
77 value: &T,
78 expiry: Option<u64>,
79 ) -> anyhow::Result<()>
80 where
81 T: DeserializeOwned + Serialize,
82 {
83 let value_str = serde_json::to_string(&value)
84 .map_err(|e| anyhow!("save_entity serde_json error: {}", e))?;
85 match expiry {
86 Some(expiry) => {
87 let _: () = self
88 .connection()
89 .set_ex(self.key(prefix, key), value_str, expiry)
90 .await?;
91 }
92 None => {
93 let _: () = self
94 .connection()
95 .set(self.key(prefix, key), value_str)
96 .await?;
97 }
98 }
99 Ok(())
100 }
101
102 pub async fn remove_entity(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<()> {
103 let _: () = self.connection().del(self.key(prefix, key)).await?;
104 Ok(())
105 }
106}