simple_redis_wrapper/client/
redis_async_client.rs

1use crate::client::types::{EvictionPolicy, Key, Namespace, Prefix};
2use anyhow::anyhow;
3use redis::{AsyncCommands, cmd};
4use serde::Serialize;
5use serde::de::DeserializeOwned;
6use std::env;
7
8pub struct RedisAsyncClient {
9    pub url: String,
10    pub client: redis::Client,
11    pub connection: redis::aio::MultiplexedConnection,
12    pub namespace: Namespace,
13}
14
15impl RedisAsyncClient {
16    pub async fn new(url: Option<String>, namespace: Namespace) -> anyhow::Result<Self> {
17        let url = url.unwrap_or(env::var("REDIS_URL")?);
18        let client = redis::Client::open(url.clone())?;
19        let connection = client.get_multiplexed_async_connection().await?;
20        Ok(Self {
21            url,
22            client,
23            connection,
24            namespace,
25        })
26    }
27
28    pub async fn set_eviction_policy(
29        &self,
30        eviction_policy: EvictionPolicy,
31    ) -> anyhow::Result<String> {
32        let _: () = cmd("CONFIG")
33            .arg("SET")
34            .arg("maxmemory-policy")
35            .arg(eviction_policy.to_string())
36            .query_async(&mut self.connection())
37            .await?;
38        self.get_eviction_policy().await
39    }
40
41    pub async fn get_eviction_policy(&self) -> anyhow::Result<String> {
42        let current_policy: Vec<String> = cmd("CONFIG")
43            .arg("GET")
44            .arg("maxmemory-policy")
45            .query_async(&mut self.connection())
46            .await?;
47        Ok(current_policy.join(""))
48    }
49
50    pub fn key(&self, prefix: &Prefix, key: &Key) -> String {
51        format!("{}:{}:{}", self.namespace.0, prefix.0, key.0)
52    }
53
54    pub fn connection(&self) -> redis::aio::MultiplexedConnection {
55        self.connection.clone()
56    }
57
58    pub async fn get_entity<T>(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<T>
59    where
60        T: DeserializeOwned + Serialize,
61    {
62        let redis_str: Option<String> = self.connection().get(self.key(prefix, key)).await?;
63        match redis_str {
64            Some(string) => {
65                let redis_entity: T = serde_json::from_str(&string)
66                    .map_err(|e| anyhow!("get_entity serde_json error: {}", e))?;
67                Ok(redis_entity)
68            }
69            None => Err(anyhow!("Didn't find entity")),
70        }
71    }
72
73    pub async fn save_entity<T>(
74        &self,
75        prefix: &Prefix,
76        key: &Key,
77        value: &T,
78        expiry: Option<u64>,
79    ) -> anyhow::Result<()>
80    where
81        T: DeserializeOwned + Serialize,
82    {
83        let value_str = serde_json::to_string(&value)
84            .map_err(|e| anyhow!("save_entity serde_json error: {}", e))?;
85        match expiry {
86            Some(expiry) => {
87                let _: () = self
88                    .connection()
89                    .set_ex(self.key(prefix, key), value_str, expiry)
90                    .await?;
91            }
92            None => {
93                let _: () = self
94                    .connection()
95                    .set(self.key(prefix, key), value_str)
96                    .await?;
97            }
98        }
99        Ok(())
100    }
101
102    pub async fn remove_entity(&self, prefix: &Prefix, key: &Key) -> anyhow::Result<()> {
103        let _: () = self.connection().del(self.key(prefix, key)).await?;
104        Ok(())
105    }
106}