simple_redis_wrapper/client/
redis_async_client.rs1use crate::client::types::{EvictionPolicy, Key, Namespace, Prefix};
2use anyhow::anyhow;
3use redis::aio::ConnectionManager;
4use redis::{AsyncCommands, cmd};
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use std::env;
8
9pub struct RedisAsyncClient {
10 pub url: String,
11 pub connection: ConnectionManager,
12 pub namespace: Namespace,
13}
14
15impl Clone for RedisAsyncClient {
16 fn clone(&self) -> Self {
17 Self {
18 url: self.url.clone(),
19 connection: self.connection.clone(),
20 namespace: self.namespace.clone(),
21 }
22 }
23}
24
25impl RedisAsyncClient {
26 pub async fn new(url: Option<String>, namespace: Namespace) -> anyhow::Result<Self> {
27 let url = url.unwrap_or(env::var("REDIS_URL")?);
28 let url = if url.ends_with("#insecure") {
29 url
30 } else {
31 format!("{}#insecure", url)
32 };
33 let client = redis::Client::open(url.clone())?;
34 let connection = ConnectionManager::new(client).await?;
35 Ok(Self {
37 url,
38 connection,
39 namespace,
40 })
41 }
42
43 pub async fn set_eviction_policy(
44 &self,
45 eviction_policy: EvictionPolicy,
46 ) -> anyhow::Result<String> {
47 let _: () = cmd("CONFIG")
48 .arg("SET")
49 .arg("maxmemory-policy")
50 .arg(eviction_policy.to_string())
51 .query_async(&mut self.connection())
52 .await?;
53 self.get_eviction_policy().await
54 }
55
56 pub async fn get_eviction_policy(&self) -> anyhow::Result<String> {
57 let current_policy: Vec<String> = cmd("CONFIG")
58 .arg("GET")
59 .arg("maxmemory-policy")
60 .query_async(&mut self.connection())
61 .await?;
62 Ok(current_policy.join(""))
63 }
64
65 pub fn key(&self, prefix: &Prefix, key: &Key) -> String {
66 format!("{}:{}:{}", self.namespace.0, prefix.0, key.0)
67 }
68
69 pub fn connection(&self) -> ConnectionManager {
70 self.connection.clone()
71 }
72
73 pub async fn get_entity<T>(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<T>
74 where
75 T: DeserializeOwned + Serialize,
76 {
77 let redis_str: Option<String> =
78 AsyncCommands::get(&mut self.connection(), self.key(prefix, key)).await?;
79 match redis_str {
80 Some(string) => {
81 let redis_entity: T = serde_json::from_str(&string)
82 .map_err(|e| anyhow!("get_entity serde_json error: {}", e))?;
83 Ok(redis_entity)
84 }
85 None => Err(anyhow!("Didn't find entity")),
86 }
87 }
88
89 pub async fn save_entity<T>(
90 &self,
91 prefix: &Prefix,
92 key: &Key,
93 value: &T,
94 expiry: Option<u64>,
95 ) -> anyhow::Result<()>
96 where
97 T: DeserializeOwned + Serialize,
98 {
99 let value_str = serde_json::to_string(&value)
100 .map_err(|e| anyhow!("save_entity serde_json error: {}", e))?;
101 match expiry {
102 Some(expiry) => {
103 let _: () = AsyncCommands::set_ex(
104 &mut self.connection(),
105 self.key(prefix, key),
106 value_str,
107 expiry,
108 )
109 .await?;
110 }
111 None => {
112 let _: () =
113 AsyncCommands::set(&mut self.connection(), self.key(prefix, key), value_str)
114 .await?;
115 }
116 }
117 Ok(())
118 }
119
120 pub async fn remove_entity(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<()> {
121 let _: () = AsyncCommands::del(&mut self.connection(), self.key(prefix, key)).await?;
122 Ok(())
123 }
124}