flippico_cache/providers/
redis.rs1use futures_util::stream::StreamExt;
2use log::info;
3use redis::{Client, Commands, RedisResult};
4use serde_json;
5use std::future::Future;
6use std::pin::Pin;
7
8use crate::types::{
9 channels::{ChannelMessage, ListChannel, ListMessage, SubscriptionChannel},
10 provider::{
11 AsyncCallback, CacheProvider, Connectable, FifoProvider, ProviderTrait, PubSubProvider,
12 PubSubProviderError,
13 },
14};
15
16pub struct Redis {
17 pub url: String,
18 pub client: Option<Client>,
19}
20
21impl ProviderTrait for Redis {
22 fn new(url: String) -> Self {
23 Redis { url, client: None }
24 }
25}
26
27impl Connectable for Redis {
28 fn set_client(&mut self) {
29 let client = redis::Client::open(self.url.clone()).unwrap();
30 self.client = Some(client);
31 }
32
33 fn get_connection(&self) -> &Client {
34 self.client.as_ref().unwrap()
35 }
36
37 fn get_mut_connection(&mut self) -> &mut Client {
38 self.client.as_mut().unwrap()
39 }
40}
41
42impl PubSubProvider for Redis
43where
44 Redis: Connectable,
45{
46 type Channels = SubscriptionChannel;
47
48 fn subscribe(
49 &self,
50 callback: AsyncCallback<ChannelMessage>,
51 channel: Self::Channels,
52 ) -> Pin<Box<dyn Future<Output = Result<(), PubSubProviderError>> + Send + '_>> {
53 Box::pin(async move {
54 let client = self.get_connection();
55 let mut subscriber = client
56 .get_async_pubsub()
57 .await
58 .map_err(|_| "Failed to get async pubsub")?;
59 subscriber
60 .subscribe(&[channel.get_channel()])
61 .await
62 .map_err(|_| "Failed to subscribe to channel")?;
63 let mut stream = subscriber.into_on_message();
64 while let Some(msg) = stream.next().await {
65 let payload = msg.get_payload::<String>();
66 if let Ok(json_str) = payload {
67 if let Ok(channel_msg) = serde_json::from_str::<ChannelMessage>(&json_str) {
68 info!(
69 "[{}] New message: {}",
70 channel_msg.channel.get_channel(),
71 &json_str
72 );
73 callback(channel_msg).await;
74 }
75 }
76 }
77
78 Ok(())
79 })
80 }
81
82 fn publish(&mut self, channel: Self::Channels, message: ChannelMessage) {
83 let client = self.get_mut_connection();
84 if let Ok(json_str) = serde_json::to_string(&message) {
85 let _ = client.publish::<&str, String, String>(channel.get_channel(), json_str);
86 }
87 }
88}
89
90impl FifoProvider for Redis
91where
92 Redis: Connectable,
93{
94 fn pop(
95 &self,
96 channel_name: ListChannel,
97 job_key: &str,
98 callback: AsyncCallback<ListMessage>,
99 ) -> Pin<Box<dyn Future<Output = Result<(), PubSubProviderError>> + Send + '_>> {
100 let list_name = self.get_list_name(channel_name.get_channel(), job_key);
101
102 Box::pin(async move {
103 let client: &Client = self.get_connection();
104 let mut client = client.clone();
105
106 loop {
107 let result = client
108 .blpop(&list_name, 0.0)
109 .map(|mut items: Vec<String>| items.pop().unwrap_or_default());
110
111 match result {
112 Ok(str) => {
113 if let Ok(channel_msg) = serde_json::from_str::<ListMessage>(&str) {
114 info!("Popped message: {}", &str);
115 callback(channel_msg).await;
116 }
117 }
118 Err(err) => {
119 info!("Error popping from list {}: {}", list_name, err);
120 }
121 }
122 }
123 })
124 }
125
126 fn push(&self, channel_name: ListChannel, job_key: &str, job_payload: serde_json::Value) {
127 let client: &Client = self.get_connection();
128 let mut client = client.clone();
129 if let Ok(json_str) = serde_json::to_string(&job_payload) {
130 let _: () = client
131 .rpush(
132 self.get_list_name(channel_name.get_channel(), job_key),
133 json_str,
134 )
135 .unwrap();
136 }
137 }
138}
139
140impl CacheProvider for Redis
141where
142 Redis: Connectable,
143{
144 fn get(
145 &self,
146 cache_space: crate::types::channels::CacheSpace,
147 key: &str,
148 app_name: Option<String>,
149 ) -> String {
150 let client: &Client = self.get_connection();
151 let mut client = client.clone();
152 let redis_key = self.get_key_name(cache_space, key, app_name);
153 let result: RedisResult<String> = client.get(&redis_key);
154 match result {
155 Ok(value) => value,
156 Err(_) => String::new(),
157 }
158 }
159
160 fn set(
161 &self,
162 cache_space: crate::types::channels::CacheSpace,
163 key: &str,
164 value: serde_json::Value,
165 app_name: Option<String>,
166 ) {
167 let client: &Client = self.get_connection();
168 let mut client = client.clone();
169 let redis_key = self.get_key_name(cache_space, key, app_name);
170 let cache_value = self.get_value(None, value);
171 if let Ok(json_str) = serde_json::to_string(&cache_value) {
172 let _: () = client.set(&redis_key, json_str).unwrap();
173 }
174 }
175
176 fn delete(
177 &self,
178 cache_space: crate::types::channels::CacheSpace,
179 key: &str,
180 app_name: Option<String>,
181 ) {
182 let client: &Client = self.get_connection();
183 let mut client = client.clone();
184 let redis_key = self.get_key_name(cache_space, key, app_name);
185 let _: () = client.del(&redis_key).unwrap();
186 }
187}