simple_redis_wrapper/client/
redis_async_client.rs1use crate::client::types::{EvictionPolicy, Key, Namespace, Prefix};
2use anyhow::anyhow;
3use redis::{AsyncCommands, cmd};
4use serde::Serialize;
5use serde::de::DeserializeOwned;
6use std::env;
7
8pub struct RedisAsyncClient {
9 pub url: String,
10 pub connection: redis::aio::MultiplexedConnection,
11 pub namespace: Namespace,
12}
13
14impl Clone for RedisAsyncClient {
15 fn clone(&self) -> Self {
16 Self {
17 url: self.url.clone(),
18 connection: self.connection.clone(),
19 namespace: self.namespace.clone(),
20 }
21 }
22}
23
24impl RedisAsyncClient {
25 pub async fn new(url: Option<String>, namespace: Namespace) -> anyhow::Result<Self> {
26 let url = url.unwrap_or(env::var("REDIS_URL")?);
27 let client = redis::Client::open(url.clone())?;
28 let connection = client.get_multiplexed_async_connection().await?;
29 Ok(Self {
30 url,
31 connection,
32 namespace,
33 })
34 }
35
36 pub async fn set_eviction_policy(
37 &self,
38 eviction_policy: EvictionPolicy,
39 ) -> anyhow::Result<String> {
40 let _: () = cmd("CONFIG")
41 .arg("SET")
42 .arg("maxmemory-policy")
43 .arg(eviction_policy.to_string())
44 .query_async(&mut self.connection())
45 .await?;
46 self.get_eviction_policy().await
47 }
48
49 pub async fn get_eviction_policy(&self) -> anyhow::Result<String> {
50 let current_policy: Vec<String> = cmd("CONFIG")
51 .arg("GET")
52 .arg("maxmemory-policy")
53 .query_async(&mut self.connection())
54 .await?;
55 Ok(current_policy.join(""))
56 }
57
58 pub fn key(&self, prefix: &Prefix, key: &Key) -> String {
59 format!("{}:{}:{}", self.namespace.0, prefix.0, key.0)
60 }
61
62 pub fn connection(&self) -> redis::aio::MultiplexedConnection {
63 self.connection.clone()
64 }
65
66 pub async fn get_entity<T>(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<T>
67 where
68 T: DeserializeOwned + Serialize,
69 {
70 let redis_str: Option<String> = self.connection().get(self.key(prefix, key)).await?;
71 match redis_str {
72 Some(string) => {
73 let redis_entity: T = serde_json::from_str(&string)
74 .map_err(|e| anyhow!("get_entity serde_json error: {}", e))?;
75 Ok(redis_entity)
76 }
77 None => Err(anyhow!("Didn't find entity")),
78 }
79 }
80
81 pub async fn save_entity<T>(
82 &self,
83 prefix: &Prefix,
84 key: &Key,
85 value: &T,
86 expiry: Option<u64>,
87 ) -> anyhow::Result<()>
88 where
89 T: DeserializeOwned + Serialize,
90 {
91 let value_str = serde_json::to_string(&value)
92 .map_err(|e| anyhow!("save_entity serde_json error: {}", e))?;
93 match expiry {
94 Some(expiry) => {
95 let _: () = self
96 .connection()
97 .set_ex(self.key(prefix, key), value_str, expiry)
98 .await?;
99 }
100 None => {
101 let _: () = self
102 .connection()
103 .set(self.key(prefix, key), value_str)
104 .await?;
105 }
106 }
107 Ok(())
108 }
109
110 pub async fn remove_entity(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<()> {
111 let _: () = self.connection().del(self.key(prefix, key)).await?;
112 Ok(())
113 }
114}