simple_redis_wrapper/client/
redis_async_client.rs1use crate::client::types::{EvictionPolicy, Key, Namespace, Prefix};
2use anyhow::anyhow;
3use redis::aio::ConnectionManager;
4use redis::{AsyncCommands, cmd};
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use std::env;
8
9pub struct RedisAsyncClient {
10 pub url: String,
11 pub connection: ConnectionManager,
12 pub namespace: Namespace,
13}
14
15impl Clone for RedisAsyncClient {
16 fn clone(&self) -> Self {
17 Self {
18 url: self.url.clone(),
19 connection: self.connection.clone(),
20 namespace: self.namespace.clone(),
21 }
22 }
23}
24
25impl RedisAsyncClient {
26 pub async fn new(url: Option<String>, namespace: Namespace) -> anyhow::Result<Self> {
27 let url = url.unwrap_or(env::var("REDIS_URL")?);
28 let url = if url.ends_with("#insecure") {
29 url
30 } else {
31 format!("{}#insecure", url)
32 };
33 let client = redis::Client::open(url.clone())?;
34 let connection = ConnectionManager::new(client).await?;
35 Ok(Self {
37 url,
38 connection,
39 namespace,
40 })
41 }
42
43 pub async fn set_eviction_policy(
44 &self,
45 eviction_policy: EvictionPolicy,
46 ) -> anyhow::Result<String> {
47 let _: () = cmd("CONFIG")
48 .arg("SET")
49 .arg("maxmemory-policy")
50 .arg(eviction_policy.to_string())
51 .query_async(&mut self.connection())
52 .await?;
53 self.get_eviction_policy().await
54 }
55
56 pub async fn get_eviction_policy(&self) -> anyhow::Result<String> {
57 let current_policy: Vec<String> = cmd("CONFIG")
58 .arg("GET")
59 .arg("maxmemory-policy")
60 .query_async(&mut self.connection())
61 .await?;
62 Ok(current_policy.join(""))
63 }
64
65 pub fn key(&self, prefix: &Prefix, key: &Key) -> String {
66 format!("{}:{}:{}", self.namespace.0, prefix.0, key.0)
67 }
68
69 pub fn connection(&self) -> ConnectionManager {
70 self.connection.clone()
71 }
72
73 pub async fn get(&self, key: &str) -> anyhow::Result<Option<String>> {
74 let redis_str: Option<String> = AsyncCommands::get(&mut self.connection(), key).await?;
75 Ok(redis_str)
76 }
77
78 pub async fn set_ex(&self, key: &str, value: &str, expiry: Option<u64>) -> anyhow::Result<()> {
79 match expiry {
80 Some(expiry) => {
81 let _: () =
82 AsyncCommands::set_ex(&mut self.connection(), key, value, expiry).await?;
83 }
84 None => {
85 let _: () = AsyncCommands::set(&mut self.connection(), key, value).await?;
86 }
87 }
88 Ok(())
89 }
90
91 pub async fn get_all(&self) -> anyhow::Result<Vec<(String, String)>> {
92 let mut output: Vec<(String, String)> = Vec::new();
93 let keys: Vec<String> = AsyncCommands::keys(&mut self.connection(), "*").await?;
94 for key in keys {
95 match self.get(&key).await? {
96 Some(value) => output.push((key, value)),
97 None => {}
98 }
99 }
100 Ok(output)
101 }
102
103 pub async fn remove(&self, key: &str) -> anyhow::Result<()> {
104 let _: () = AsyncCommands::del(&mut self.connection(), key).await?;
105 Ok(())
106 }
107
108 pub async fn get_entity<T>(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<T>
109 where
110 T: DeserializeOwned + Serialize,
111 {
112 let redis_str: Option<String> =
113 AsyncCommands::get(&mut self.connection(), self.key(prefix, key)).await?;
114 match redis_str {
115 Some(string) => {
116 let redis_entity: T = serde_json::from_str(&string)
117 .map_err(|e| anyhow!("get_entity serde_json error: {}", e))?;
118 Ok(redis_entity)
119 }
120 None => Err(anyhow!("Didn't find entity")),
121 }
122 }
123
124 pub async fn save_entity<T>(
125 &self,
126 prefix: &Prefix,
127 key: &Key,
128 value: &T,
129 expiry: Option<u64>,
130 ) -> anyhow::Result<()>
131 where
132 T: DeserializeOwned + Serialize,
133 {
134 let value_str = serde_json::to_string(&value)
135 .map_err(|e| anyhow!("save_entity serde_json error: {}", e))?;
136 match expiry {
137 Some(expiry) => {
138 let _: () = AsyncCommands::set_ex(
139 &mut self.connection(),
140 self.key(prefix, key),
141 value_str,
142 expiry,
143 )
144 .await?;
145 }
146 None => {
147 let _: () =
148 AsyncCommands::set(&mut self.connection(), self.key(prefix, key), value_str)
149 .await?;
150 }
151 }
152 Ok(())
153 }
154
155 pub async fn remove_entity(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<()> {
156 let _: () = AsyncCommands::del(&mut self.connection(), self.key(prefix, key)).await?;
157 Ok(())
158 }
159}