Skip to main content

flippico_cache/providers/
redis.rs

1use futures_util::stream::StreamExt;
2use log::info;
3use redis::{Client, Commands, RedisResult};
4use serde_json;
5use std::future::Future;
6use std::pin::Pin;
7
8use crate::types::{
9    channels::{ChannelMessage, ListChannel, ListMessage, SubscriptionChannel},
10    provider::{
11        AsyncCallback, CacheProvider, Connectable, FifoProvider, ProviderTrait, PubSubProvider,
12        PubSubProviderError, SortedSetProvider,
13    },
14};
15
16pub struct Redis {
17    pub url: String,
18    pub client: Option<Client>,
19}
20
21impl ProviderTrait for Redis {
22    fn new(url: String) -> Self {
23        Redis { url, client: None }
24    }
25}
26
27impl Connectable for Redis {
28    fn set_client(&mut self) {
29        let client = redis::Client::open(self.url.clone()).unwrap();
30        self.client = Some(client);
31    }
32
33    fn get_connection(&self) -> &Client {
34        self.client.as_ref().unwrap()
35    }
36
37    fn get_mut_connection(&mut self) -> &mut Client {
38        self.client.as_mut().unwrap()
39    }
40}
41
42impl PubSubProvider for Redis
43where
44    Redis: Connectable,
45{
46    type Channels = SubscriptionChannel;
47
48    fn subscribe(
49        &self,
50        callback: AsyncCallback<ChannelMessage>,
51        channel: Self::Channels,
52    ) -> Pin<Box<dyn Future<Output = Result<(), PubSubProviderError>> + Send + '_>> {
53        Box::pin(async move {
54            let client = self.get_connection();
55            let mut subscriber = client
56                .get_async_pubsub()
57                .await
58                .map_err(|_| "Failed to get async pubsub")?;
59            subscriber
60                .subscribe(&[channel.get_channel()])
61                .await
62                .map_err(|_| "Failed to subscribe to channel")?;
63            let mut stream = subscriber.into_on_message();
64            while let Some(msg) = stream.next().await {
65                let payload = msg.get_payload::<String>();
66                if let Ok(json_str) = payload {
67                    if let Ok(channel_msg) = serde_json::from_str::<ChannelMessage>(&json_str) {
68                        info!(
69                            "[{}] New message: {}",
70                            channel_msg.channel.get_channel(),
71                            &json_str
72                        );
73                        callback(channel_msg).await;
74                    }
75                }
76            }
77
78            Ok(())
79        })
80    }
81
82    fn publish(&mut self, channel: Self::Channels, message: ChannelMessage) {
83        let client = self.get_mut_connection();
84        if let Ok(json_str) = serde_json::to_string(&message) {
85            let _ = client.publish::<&str, String, String>(channel.get_channel(), json_str);
86        }
87    }
88}
89
90impl FifoProvider for Redis
91where
92    Redis: Connectable,
93{
94    fn pop(&self, channel_name: ListChannel, job_key: &str) -> Result<ListMessage, &'static str> {
95        let list_name = self.get_list_name(channel_name.get_channel(), job_key);
96        let client = self.get_connection();
97        let mut client = client.clone();
98
99        let result = client
100            .brpop(&list_name, 0.0)
101            .map(|mut items: Vec<String>| items.pop().unwrap_or_default());
102
103        match result {
104            Ok(json_str) => serde_json::from_str::<serde_json::Value>(&json_str)
105                .map(|value| ListMessage {
106                    meta: None,
107                    body: Some(value),
108                })
109                .map_err(|_| "Failed to deserialize list message"),
110            Err(_) => Err("Failed to pop from list"),
111        }
112    }
113
114    fn push(&self, channel_name: ListChannel, job_key: &str, job_payload: serde_json::Value) {
115        let client: &Client = self.get_connection();
116        let mut client = client.clone();
117        if let Ok(json_str) = serde_json::to_string(&job_payload) {
118            let _: () = client
119                .rpush(
120                    self.get_list_name(channel_name.get_channel(), job_key),
121                    json_str,
122                )
123                .unwrap();
124        }
125    }
126}
127
128impl CacheProvider for Redis
129where
130    Redis: Connectable,
131{
132    fn get(
133        &self,
134        cache_space: crate::types::channels::CacheSpace,
135        key: &str,
136        app_name: Option<String>,
137    ) -> String {
138        let client: &Client = self.get_connection();
139        let mut client = client.clone();
140        let redis_key = self.get_key_name(cache_space, key, app_name);
141        let result: RedisResult<String> = client.get(&redis_key);
142        match result {
143            Ok(value) => value,
144            Err(_) => String::new(),
145        }
146    }
147
148    fn set(
149        &self,
150        cache_space: crate::types::channels::CacheSpace,
151        key: &str,
152        value: serde_json::Value,
153        app_name: Option<String>,
154    ) {
155        let client: &Client = self.get_connection();
156        let mut client = client.clone();
157        let redis_key = self.get_key_name(cache_space, key, app_name);
158        let cache_value = self.get_value(None, value);
159        if let Ok(json_str) = serde_json::to_string(&cache_value) {
160            let _: () = client.set(&redis_key, json_str).unwrap();
161        }
162    }
163
164    fn delete(
165        &self,
166        cache_space: crate::types::channels::CacheSpace,
167        key: &str,
168        app_name: Option<String>,
169    ) {
170        let client: &Client = self.get_connection();
171        let mut client = client.clone();
172        let redis_key = self.get_key_name(cache_space, key, app_name);
173        let _: () = client.del(&redis_key).unwrap();
174    }
175
176    fn set_with_ttl(
177        &self,
178        cache_space: crate::types::channels::CacheSpace,
179        key: &str,
180        value: serde_json::Value,
181        app_name: Option<String>,
182        ttl_secs: u64,
183    ) {
184        let client: &Client = self.get_connection();
185        let mut client = client.clone();
186        let redis_key = self.get_key_name(cache_space, key, app_name);
187        let cache_value = self.get_value(None, value);
188        if let Ok(json_str) = serde_json::to_string(&cache_value) {
189            let _: () = client.set_ex(&redis_key, json_str, ttl_secs).unwrap();
190        }
191    }
192}
193
194impl SortedSetProvider for Redis
195where
196    Redis: Connectable,
197{
198    fn zadd(&self, key: &str, score: f64, member: &str) -> Result<u32, &'static str> {
199        let client = self.get_connection();
200        let mut client = client.clone();
201        client
202            .zadd(key, member, score)
203            .map_err(|_| "Failed to ZADD")
204    }
205
206    fn zremrangebyscore(&self, key: &str, min: f64, max: f64) -> Result<u32, &'static str> {
207        let client = self.get_connection();
208        let mut client = client.clone();
209        client
210            .zrembyscore(key, min, max)
211            .map_err(|_| "Failed to ZREMRANGEBYSCORE")
212    }
213
214    fn zcard(&self, key: &str) -> Result<u32, &'static str> {
215        let client = self.get_connection();
216        let mut client = client.clone();
217        client.zcard(key).map_err(|_| "Failed to ZCARD")
218    }
219
220    fn zrangebyscore_withscores(
221        &self,
222        key: &str,
223        min: f64,
224        max: f64,
225        limit: Option<usize>,
226    ) -> Result<Vec<(String, f64)>, &'static str> {
227        let client = self.get_connection();
228        let mut client = client.clone();
229        let result: RedisResult<Vec<(String, f64)>> = if let Some(count) = limit {
230            redis::cmd("ZRANGEBYSCORE")
231                .arg(key)
232                .arg(min)
233                .arg(max)
234                .arg("WITHSCORES")
235                .arg("LIMIT")
236                .arg(0)
237                .arg(count)
238                .query(&mut client)
239        } else {
240            redis::cmd("ZRANGEBYSCORE")
241                .arg(key)
242                .arg(min)
243                .arg(max)
244                .arg("WITHSCORES")
245                .query(&mut client)
246        };
247        result.map_err(|_| "Failed to ZRANGEBYSCORE")
248    }
249
250    fn expire(&self, key: &str, ttl_secs: u64) -> Result<(), &'static str> {
251        let client = self.get_connection();
252        let mut client = client.clone();
253        redis::cmd("EXPIRE")
254            .arg(key)
255            .arg(ttl_secs)
256            .query::<()>(&mut client)
257            .map_err(|_| "Failed to EXPIRE")
258    }
259
260    fn eval_script(
261        &self,
262        script: &str,
263        keys: &[&str],
264        args: &[&str],
265    ) -> Result<Vec<i64>, &'static str> {
266        let client = self.get_connection();
267        let mut client = client.clone();
268        let cmd = redis::Script::new(script);
269        let mut invocation = cmd.prepare_invoke();
270        for k in keys {
271            invocation.key(*k);
272        }
273        for a in args {
274            invocation.arg(*a);
275        }
276        invocation
277            .invoke::<Vec<i64>>(&mut client)
278            .map_err(|_| "Failed to evaluate Lua script")
279    }
280}