simple_redis_wrapper/client/
redis_async_client.rs1use crate::client::types::{EvictionPolicy, Key, Namespace, Prefix};
2use anyhow::anyhow;
3use redis::{AsyncCommands, cmd};
4use serde::Serialize;
5use serde::de::DeserializeOwned;
6use std::env;
7
8pub struct RedisAsyncClient {
9 pub url: String,
10 pub connection: redis::aio::MultiplexedConnection,
11 pub namespace: Namespace,
12}
13
14impl Clone for RedisAsyncClient {
15 fn clone(&self) -> Self {
16 Self {
17 url: self.url.clone(),
18 connection: self.connection.clone(),
19 namespace: self.namespace.clone(),
20 }
21 }
22}
23
24impl RedisAsyncClient {
25 pub async fn new(url: Option<String>, namespace: Namespace) -> anyhow::Result<Self> {
26 let url = url.unwrap_or(env::var("REDIS_URL")?);
27 let url = if url.ends_with("#insecure") {
28 url
29 } else {
30 format!("{}#insecure", url)
31 };
32 let client = redis::Client::open(url.clone())?;
33 let connection = client.get_multiplexed_async_connection().await?;
34 Ok(Self {
35 url,
36 connection,
37 namespace,
38 })
39 }
40
41 pub async fn set_eviction_policy(
42 &self,
43 eviction_policy: EvictionPolicy,
44 ) -> anyhow::Result<String> {
45 let _: () = cmd("CONFIG")
46 .arg("SET")
47 .arg("maxmemory-policy")
48 .arg(eviction_policy.to_string())
49 .query_async(&mut self.connection())
50 .await?;
51 self.get_eviction_policy().await
52 }
53
54 pub async fn get_eviction_policy(&self) -> anyhow::Result<String> {
55 let current_policy: Vec<String> = cmd("CONFIG")
56 .arg("GET")
57 .arg("maxmemory-policy")
58 .query_async(&mut self.connection())
59 .await?;
60 Ok(current_policy.join(""))
61 }
62
63 pub fn key(&self, prefix: &Prefix, key: &Key) -> String {
64 format!("{}:{}:{}", self.namespace.0, prefix.0, key.0)
65 }
66
67 pub fn connection(&self) -> redis::aio::MultiplexedConnection {
68 self.connection.clone()
69 }
70
71 pub async fn get_entity<T>(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<T>
72 where
73 T: DeserializeOwned + Serialize,
74 {
75 let redis_str: Option<String> = self.connection().get(self.key(prefix, key)).await?;
76 match redis_str {
77 Some(string) => {
78 let redis_entity: T = serde_json::from_str(&string)
79 .map_err(|e| anyhow!("get_entity serde_json error: {}", e))?;
80 Ok(redis_entity)
81 }
82 None => Err(anyhow!("Didn't find entity")),
83 }
84 }
85
86 pub async fn save_entity<T>(
87 &self,
88 prefix: &Prefix,
89 key: &Key,
90 value: &T,
91 expiry: Option<u64>,
92 ) -> anyhow::Result<()>
93 where
94 T: DeserializeOwned + Serialize,
95 {
96 let value_str = serde_json::to_string(&value)
97 .map_err(|e| anyhow!("save_entity serde_json error: {}", e))?;
98 match expiry {
99 Some(expiry) => {
100 let _: () = self
101 .connection()
102 .set_ex(self.key(prefix, key), value_str, expiry)
103 .await?;
104 }
105 None => {
106 let _: () = self
107 .connection()
108 .set(self.key(prefix, key), value_str)
109 .await?;
110 }
111 }
112 Ok(())
113 }
114
115 pub async fn remove_entity(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<()> {
116 let _: () = self.connection().del(self.key(prefix, key)).await?;
117 Ok(())
118 }
119}