simple_redis_wrapper/client/
redis_async_client.rs1use crate::client::types::{EvictionPolicy, Key, Namespace, Prefix};
2use anyhow::anyhow;
3use redis::aio::ConnectionManager;
4use redis::{AsyncCommands, cmd};
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use std::env;
8
9pub struct RedisAsyncClient {
10 pub url: String,
11 pub connection: ConnectionManager,
12 pub namespace: Namespace,
13}
14
15impl Clone for RedisAsyncClient {
16 fn clone(&self) -> Self {
17 Self {
18 url: self.url.clone(),
19 connection: self.connection.clone(),
20 namespace: self.namespace.clone(),
21 }
22 }
23}
24
25impl RedisAsyncClient {
26 pub async fn new(url: Option<String>, namespace: Namespace) -> anyhow::Result<Self> {
27 let url = url.unwrap_or(env::var("REDIS_URL")?);
28 let url = if url.ends_with("#insecure") {
29 url
30 } else {
31 format!("{}#insecure", url)
32 };
33 let client = redis::Client::open(url.clone())?;
34 let connection = ConnectionManager::new(client).await?;
35 Ok(Self {
37 url,
38 connection,
39 namespace,
40 })
41 }
42
43 pub async fn set_eviction_policy(
44 &self,
45 eviction_policy: EvictionPolicy,
46 ) -> anyhow::Result<String> {
47 let _: () = cmd("CONFIG")
48 .arg("SET")
49 .arg("maxmemory-policy")
50 .arg(eviction_policy.to_string())
51 .query_async(&mut self.connection())
52 .await?;
53 self.get_eviction_policy().await
54 }
55
56 pub async fn get_eviction_policy(&self) -> anyhow::Result<String> {
57 let current_policy: Vec<String> = cmd("CONFIG")
58 .arg("GET")
59 .arg("maxmemory-policy")
60 .query_async(&mut self.connection())
61 .await?;
62 Ok(current_policy.join(""))
63 }
64
65 pub fn key(&self, prefix: &Prefix, key: &Key) -> String {
66 format!("{}:{}:{}", self.namespace.0, prefix.0, key.0)
67 }
68
69 pub fn connection(&self) -> ConnectionManager {
70 self.connection.clone()
71 }
72
73 pub async fn get(&self, key: &str) -> anyhow::Result<Option<String>> {
74 let redis_str: Option<String> = AsyncCommands::get(&mut self.connection(), key).await?;
75 Ok(redis_str)
76 }
77
78 pub async fn set_ex(&self, key: &str, value: &str, expiry: Option<u64>) -> anyhow::Result<()> {
79 match expiry {
80 Some(expiry) => {
81 let _: () =
82 AsyncCommands::set_ex(&mut self.connection(), key, value, expiry).await?;
83 }
84 None => {
85 let _: () = AsyncCommands::set(&mut self.connection(), key, value).await?;
86 }
87 }
88 Ok(())
89 }
90
91 pub async fn remove(&self, key: &str) -> anyhow::Result<()> {
92 let _: () = AsyncCommands::del(&mut self.connection(), key).await?;
93 Ok(())
94 }
95
96 pub async fn get_entity<T>(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<T>
97 where
98 T: DeserializeOwned + Serialize,
99 {
100 let redis_str: Option<String> =
101 AsyncCommands::get(&mut self.connection(), self.key(prefix, key)).await?;
102 match redis_str {
103 Some(string) => {
104 let redis_entity: T = serde_json::from_str(&string)
105 .map_err(|e| anyhow!("get_entity serde_json error: {}", e))?;
106 Ok(redis_entity)
107 }
108 None => Err(anyhow!("Didn't find entity")),
109 }
110 }
111
112 pub async fn save_entity<T>(
113 &self,
114 prefix: &Prefix,
115 key: &Key,
116 value: &T,
117 expiry: Option<u64>,
118 ) -> anyhow::Result<()>
119 where
120 T: DeserializeOwned + Serialize,
121 {
122 let value_str = serde_json::to_string(&value)
123 .map_err(|e| anyhow!("save_entity serde_json error: {}", e))?;
124 match expiry {
125 Some(expiry) => {
126 let _: () = AsyncCommands::set_ex(
127 &mut self.connection(),
128 self.key(prefix, key),
129 value_str,
130 expiry,
131 )
132 .await?;
133 }
134 None => {
135 let _: () =
136 AsyncCommands::set(&mut self.connection(), self.key(prefix, key), value_str)
137 .await?;
138 }
139 }
140 Ok(())
141 }
142
143 pub async fn remove_entity(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<()> {
144 let _: () = AsyncCommands::del(&mut self.connection(), self.key(prefix, key)).await?;
145 Ok(())
146 }
147}