simple_redis_wrapper/client/
redis_async_client.rs

1use crate::client::types::{EvictionPolicy, Key, Namespace, Prefix};
2use anyhow::anyhow;
3use redis::aio::ConnectionManager;
4use redis::{AsyncCommands, cmd};
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use std::env;
8
9pub struct RedisAsyncClient {
10    pub url: String,
11    pub connection: ConnectionManager,
12    pub namespace: Namespace,
13}
14
15impl Clone for RedisAsyncClient {
16    fn clone(&self) -> Self {
17        Self {
18            url: self.url.clone(),
19            connection: self.connection.clone(),
20            namespace: self.namespace.clone(),
21        }
22    }
23}
24
25impl RedisAsyncClient {
26    pub async fn new(url: Option<String>, namespace: Namespace) -> anyhow::Result<Self> {
27        let url = url.unwrap_or(env::var("REDIS_URL")?);
28        let url = if url.ends_with("#insecure") {
29            url
30        } else {
31            format!("{}#insecure", url)
32        };
33        let client = redis::Client::open(url.clone())?;
34        let connection = ConnectionManager::new(client).await?;
35        // let connection = client.get_multiplexed_async_connection().await?;
36        Ok(Self {
37            url,
38            connection,
39            namespace,
40        })
41    }
42
43    pub async fn set_eviction_policy(
44        &self,
45        eviction_policy: EvictionPolicy,
46    ) -> anyhow::Result<String> {
47        let _: () = cmd("CONFIG")
48            .arg("SET")
49            .arg("maxmemory-policy")
50            .arg(eviction_policy.to_string())
51            .query_async(&mut self.connection())
52            .await?;
53        self.get_eviction_policy().await
54    }
55
56    pub async fn get_eviction_policy(&self) -> anyhow::Result<String> {
57        let current_policy: Vec<String> = cmd("CONFIG")
58            .arg("GET")
59            .arg("maxmemory-policy")
60            .query_async(&mut self.connection())
61            .await?;
62        Ok(current_policy.join(""))
63    }
64
65    pub fn key(&self, prefix: &Prefix, key: &Key) -> String {
66        format!("{}:{}:{}", self.namespace.0, prefix.0, key.0)
67    }
68
69    pub fn connection(&self) -> ConnectionManager {
70        self.connection.clone()
71    }
72
73    pub async fn get_entity<T>(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<T>
74    where
75        T: DeserializeOwned + Serialize,
76    {
77        let redis_str: Option<String> =
78            AsyncCommands::get(&mut self.connection(), self.key(prefix, key)).await?;
79        match redis_str {
80            Some(string) => {
81                let redis_entity: T = serde_json::from_str(&string)
82                    .map_err(|e| anyhow!("get_entity serde_json error: {}", e))?;
83                Ok(redis_entity)
84            }
85            None => Err(anyhow!("Didn't find entity")),
86        }
87    }
88
89    pub async fn save_entity<T>(
90        &self,
91        prefix: &Prefix,
92        key: &Key,
93        value: &T,
94        expiry: Option<u64>,
95    ) -> anyhow::Result<()>
96    where
97        T: DeserializeOwned + Serialize,
98    {
99        let value_str = serde_json::to_string(&value)
100            .map_err(|e| anyhow!("save_entity serde_json error: {}", e))?;
101        match expiry {
102            Some(expiry) => {
103                let _: () = AsyncCommands::set_ex(
104                    &mut self.connection(),
105                    self.key(prefix, key),
106                    value_str,
107                    expiry,
108                )
109                .await?;
110            }
111            None => {
112                let _: () =
113                    AsyncCommands::set(&mut self.connection(), self.key(prefix, key), value_str)
114                        .await?;
115            }
116        }
117        Ok(())
118    }
119
120    pub async fn remove_entity(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<()> {
121        let _: () = AsyncCommands::del(&mut self.connection(), self.key(prefix, key)).await?;
122        Ok(())
123    }
124}