flippico_cache/providers/
redis.rs

1use futures_util::stream::StreamExt;
2use log::info;
3use redis::{Client, Commands, RedisResult};
4use serde_json;
5use std::future::Future;
6use std::pin::Pin;
7
8use crate::types::{
9    channels::{ChannelMessage, ListChannel, ListMessage, SubscriptionChannel},
10    provider::{
11        AsyncCallback, CacheProvider, Connectable, FifoProvider, ProviderTrait, PubSubProvider,
12        PubSubProviderError,
13    },
14};
15
16pub struct Redis {
17    pub url: String,
18    pub client: Option<Client>,
19}
20
21impl ProviderTrait for Redis {
22    fn new(url: String) -> Self {
23        Redis { url, client: None }
24    }
25}
26
27impl Connectable for Redis {
28    fn set_client(&mut self) {
29        let client = redis::Client::open(self.url.clone()).unwrap();
30        self.client = Some(client);
31    }
32
33    fn get_connection(&self) -> &Client {
34        self.client.as_ref().unwrap()
35    }
36
37    fn get_mut_connection(&mut self) -> &mut Client {
38        self.client.as_mut().unwrap()
39    }
40}
41
42impl PubSubProvider for Redis
43where
44    Redis: Connectable,
45{
46    type Channels = SubscriptionChannel;
47
48    fn subscribe(
49        &self,
50        callback: AsyncCallback<ChannelMessage>,
51        channel: Self::Channels,
52    ) -> Pin<Box<dyn Future<Output = Result<(), PubSubProviderError>> + Send + '_>> {
53        Box::pin(async move {
54            let client = self.get_connection();
55            let mut subscriber = client
56                .get_async_pubsub()
57                .await
58                .map_err(|_| "Failed to get async pubsub")?;
59            subscriber
60                .subscribe(&[channel.get_channel()])
61                .await
62                .map_err(|_| "Failed to subscribe to channel")?;
63            let mut stream = subscriber.into_on_message();
64            while let Some(msg) = stream.next().await {
65                let payload = msg.get_payload::<String>();
66                if let Ok(json_str) = payload {
67                    if let Ok(channel_msg) = serde_json::from_str::<ChannelMessage>(&json_str) {
68                        info!(
69                            "[{}] New message: {}",
70                            channel_msg.channel.get_channel(),
71                            &json_str
72                        );
73                        callback(channel_msg).await;
74                    }
75                }
76            }
77
78            Ok(())
79        })
80    }
81
82    fn publish(&mut self, channel: Self::Channels, message: ChannelMessage) {
83        let client = self.get_mut_connection();
84        if let Ok(json_str) = serde_json::to_string(&message) {
85            let _ = client.publish::<&str, String, String>(channel.get_channel(), json_str);
86        }
87    }
88}
89
90impl FifoProvider for Redis
91where
92    Redis: Connectable,
93{
94    fn pop(
95        &self,
96        channel_name: ListChannel,
97        job_key: &str,
98        callback: AsyncCallback<ListMessage>,
99    ) -> Pin<Box<dyn Future<Output = Result<(), PubSubProviderError>> + Send + '_>> {
100        let list_name = self.get_list_name(channel_name.get_channel(), job_key);
101
102        Box::pin(async move {
103            let client: &Client = self.get_connection();
104            let mut client = client.clone();
105
106            loop {
107                let result = client
108                    .blpop(&list_name, 0.0)
109                    .map(|mut items: Vec<String>| items.pop().unwrap_or_default());
110
111                match result {
112                    Ok(str) => {
113                        if let Ok(channel_msg) = serde_json::from_str::<ListMessage>(&str) {
114                            info!("Popped message: {}", &str);
115                            callback(channel_msg).await;
116                        }
117                    }
118                    Err(err) => {
119                        info!("Error popping from list {}: {}", list_name, err);
120                    }
121                }
122            }
123        })
124    }
125
126    fn push(&self, channel_name: ListChannel, job_key: &str, job_payload: serde_json::Value) {
127        let client: &Client = self.get_connection();
128        let mut client = client.clone();
129        if let Ok(json_str) = serde_json::to_string(&job_payload) {
130            let _: () = client
131                .rpush(
132                    self.get_list_name(channel_name.get_channel(), job_key),
133                    json_str,
134                )
135                .unwrap();
136        }
137    }
138}
139
140impl CacheProvider for Redis
141where
142    Redis: Connectable,
143{
144    fn get(
145        &self,
146        cache_space: crate::types::channels::CacheSpace,
147        key: &str,
148        app_name: Option<String>,
149    ) -> String {
150        let client: &Client = self.get_connection();
151        let mut client = client.clone();
152        let redis_key = self.get_key_name(cache_space, key, app_name);
153        let result: RedisResult<String> = client.get(&redis_key);
154        match result {
155            Ok(value) => value,
156            Err(_) => String::new(),
157        }
158    }
159
160    fn set(
161        &self,
162        cache_space: crate::types::channels::CacheSpace,
163        key: &str,
164        value: serde_json::Value,
165        app_name: Option<String>,
166    ) {
167        let client: &Client = self.get_connection();
168        let mut client = client.clone();
169        let redis_key = self.get_key_name(cache_space, key, app_name);
170        let cache_value = self.get_value(None, value);
171        if let Ok(json_str) = serde_json::to_string(&cache_value) {
172            let _: () = client.set(&redis_key, json_str).unwrap();
173        }
174    }
175
176    fn delete(
177        &self,
178        cache_space: crate::types::channels::CacheSpace,
179        key: &str,
180        app_name: Option<String>,
181    ) {
182        let client: &Client = self.get_connection();
183        let mut client = client.clone();
184        let redis_key = self.get_key_name(cache_space, key, app_name);
185        let _: () = client.del(&redis_key).unwrap();
186    }
187}