flippico_cache/providers/
redis.rs1use futures_util::stream::StreamExt;
2use log::info;
3use redis::{Client, Commands, RedisResult};
4use serde_json;
5use std::future::Future;
6use std::pin::Pin;
7
8use crate::types::{
9 channels::{ChannelMessage, ListChannel, ListMessage, SubscriptionChannel},
10 provider::{
11 AsyncCallback, CacheProvider, Connectable, FifoProvider, ProviderTrait, PubSubProvider,
12 PubSubProviderError, SortedSetProvider,
13 },
14};
15
16pub struct Redis {
17 pub url: String,
18 pub client: Option<Client>,
19}
20
21impl ProviderTrait for Redis {
22 fn new(url: String) -> Self {
23 Redis { url, client: None }
24 }
25}
26
27impl Connectable for Redis {
28 fn set_client(&mut self) {
29 let client = redis::Client::open(self.url.clone()).unwrap();
30 self.client = Some(client);
31 }
32
33 fn get_connection(&self) -> &Client {
34 self.client.as_ref().unwrap()
35 }
36
37 fn get_mut_connection(&mut self) -> &mut Client {
38 self.client.as_mut().unwrap()
39 }
40}
41
42impl PubSubProvider for Redis
43where
44 Redis: Connectable,
45{
46 type Channels = SubscriptionChannel;
47
48 fn subscribe(
49 &self,
50 callback: AsyncCallback<ChannelMessage>,
51 channel: Self::Channels,
52 ) -> Pin<Box<dyn Future<Output = Result<(), PubSubProviderError>> + Send + '_>> {
53 Box::pin(async move {
54 let client = self.get_connection();
55 let mut subscriber = client
56 .get_async_pubsub()
57 .await
58 .map_err(|_| "Failed to get async pubsub")?;
59 subscriber
60 .subscribe(&[channel.get_channel()])
61 .await
62 .map_err(|_| "Failed to subscribe to channel")?;
63 let mut stream = subscriber.into_on_message();
64 while let Some(msg) = stream.next().await {
65 let payload = msg.get_payload::<String>();
66 if let Ok(json_str) = payload {
67 if let Ok(channel_msg) = serde_json::from_str::<ChannelMessage>(&json_str) {
68 info!(
69 "[{}] New message: {}",
70 channel_msg.channel.get_channel(),
71 &json_str
72 );
73 callback(channel_msg).await;
74 }
75 }
76 }
77
78 Ok(())
79 })
80 }
81
82 fn publish(&mut self, channel: Self::Channels, message: ChannelMessage) {
83 let client = self.get_mut_connection();
84 if let Ok(json_str) = serde_json::to_string(&message) {
85 let _ = client.publish::<&str, String, String>(channel.get_channel(), json_str);
86 }
87 }
88}
89
90impl FifoProvider for Redis
91where
92 Redis: Connectable,
93{
94 fn pop(&self, channel_name: ListChannel, job_key: &str) -> Result<ListMessage, &'static str> {
95 let list_name = self.get_list_name(channel_name.get_channel(), job_key);
96 let client = self.get_connection();
97 let mut client = client.clone();
98
99 let result = client
100 .brpop(&list_name, 0.0)
101 .map(|mut items: Vec<String>| items.pop().unwrap_or_default());
102
103 match result {
104 Ok(json_str) => serde_json::from_str::<serde_json::Value>(&json_str)
105 .map(|value| ListMessage {
106 meta: None,
107 body: Some(value),
108 })
109 .map_err(|_| "Failed to deserialize list message"),
110 Err(_) => Err("Failed to pop from list"),
111 }
112 }
113
114 fn push(&self, channel_name: ListChannel, job_key: &str, job_payload: serde_json::Value) {
115 let client: &Client = self.get_connection();
116 let mut client = client.clone();
117 if let Ok(json_str) = serde_json::to_string(&job_payload) {
118 let _: () = client
119 .rpush(
120 self.get_list_name(channel_name.get_channel(), job_key),
121 json_str,
122 )
123 .unwrap();
124 }
125 }
126}
127
128impl CacheProvider for Redis
129where
130 Redis: Connectable,
131{
132 fn get(
133 &self,
134 cache_space: crate::types::channels::CacheSpace,
135 key: &str,
136 app_name: Option<String>,
137 ) -> String {
138 let client: &Client = self.get_connection();
139 let mut client = client.clone();
140 let redis_key = self.get_key_name(cache_space, key, app_name);
141 let result: RedisResult<String> = client.get(&redis_key);
142 match result {
143 Ok(value) => value,
144 Err(_) => String::new(),
145 }
146 }
147
148 fn set(
149 &self,
150 cache_space: crate::types::channels::CacheSpace,
151 key: &str,
152 value: serde_json::Value,
153 app_name: Option<String>,
154 ) {
155 let client: &Client = self.get_connection();
156 let mut client = client.clone();
157 let redis_key = self.get_key_name(cache_space, key, app_name);
158 let cache_value = self.get_value(None, value);
159 if let Ok(json_str) = serde_json::to_string(&cache_value) {
160 let _: () = client.set(&redis_key, json_str).unwrap();
161 }
162 }
163
164 fn delete(
165 &self,
166 cache_space: crate::types::channels::CacheSpace,
167 key: &str,
168 app_name: Option<String>,
169 ) {
170 let client: &Client = self.get_connection();
171 let mut client = client.clone();
172 let redis_key = self.get_key_name(cache_space, key, app_name);
173 let _: () = client.del(&redis_key).unwrap();
174 }
175
176 fn set_with_ttl(
177 &self,
178 cache_space: crate::types::channels::CacheSpace,
179 key: &str,
180 value: serde_json::Value,
181 app_name: Option<String>,
182 ttl_secs: u64,
183 ) {
184 let client: &Client = self.get_connection();
185 let mut client = client.clone();
186 let redis_key = self.get_key_name(cache_space, key, app_name);
187 let cache_value = self.get_value(None, value);
188 if let Ok(json_str) = serde_json::to_string(&cache_value) {
189 let _: () = client.set_ex(&redis_key, json_str, ttl_secs).unwrap();
190 }
191 }
192}
193
194impl SortedSetProvider for Redis
195where
196 Redis: Connectable,
197{
198 fn zadd(&self, key: &str, score: f64, member: &str) -> Result<u32, &'static str> {
199 let client = self.get_connection();
200 let mut client = client.clone();
201 client
202 .zadd(key, member, score)
203 .map_err(|_| "Failed to ZADD")
204 }
205
206 fn zremrangebyscore(&self, key: &str, min: f64, max: f64) -> Result<u32, &'static str> {
207 let client = self.get_connection();
208 let mut client = client.clone();
209 client
210 .zrembyscore(key, min, max)
211 .map_err(|_| "Failed to ZREMRANGEBYSCORE")
212 }
213
214 fn zcard(&self, key: &str) -> Result<u32, &'static str> {
215 let client = self.get_connection();
216 let mut client = client.clone();
217 client.zcard(key).map_err(|_| "Failed to ZCARD")
218 }
219
220 fn zrangebyscore_withscores(
221 &self,
222 key: &str,
223 min: f64,
224 max: f64,
225 limit: Option<usize>,
226 ) -> Result<Vec<(String, f64)>, &'static str> {
227 let client = self.get_connection();
228 let mut client = client.clone();
229 let result: RedisResult<Vec<(String, f64)>> = if let Some(count) = limit {
230 redis::cmd("ZRANGEBYSCORE")
231 .arg(key)
232 .arg(min)
233 .arg(max)
234 .arg("WITHSCORES")
235 .arg("LIMIT")
236 .arg(0)
237 .arg(count)
238 .query(&mut client)
239 } else {
240 redis::cmd("ZRANGEBYSCORE")
241 .arg(key)
242 .arg(min)
243 .arg(max)
244 .arg("WITHSCORES")
245 .query(&mut client)
246 };
247 result.map_err(|_| "Failed to ZRANGEBYSCORE")
248 }
249
250 fn expire(&self, key: &str, ttl_secs: u64) -> Result<(), &'static str> {
251 let client = self.get_connection();
252 let mut client = client.clone();
253 redis::cmd("EXPIRE")
254 .arg(key)
255 .arg(ttl_secs)
256 .query::<()>(&mut client)
257 .map_err(|_| "Failed to EXPIRE")
258 }
259
260 fn eval_script(
261 &self,
262 script: &str,
263 keys: &[&str],
264 args: &[&str],
265 ) -> Result<Vec<i64>, &'static str> {
266 let client = self.get_connection();
267 let mut client = client.clone();
268 let cmd = redis::Script::new(script);
269 let mut invocation = cmd.prepare_invoke();
270 for k in keys {
271 invocation.key(*k);
272 }
273 for a in args {
274 invocation.arg(*a);
275 }
276 invocation
277 .invoke::<Vec<i64>>(&mut client)
278 .map_err(|_| "Failed to evaluate Lua script")
279 }
280}