1use redis::Client;
2
3use crate::{
4 providers::redis::Redis,
5 types::channels::{
6 CacheSpace, CacheValue, ChannelMessage, ListChannel, ListMessage, MessageMeta,
7 SubscriptionChannel,
8 },
9};
10use dotenv::dotenv;
11use std::env;
12use std::future::Future;
13use std::pin::Pin;
14
15pub trait ProviderTrait {
16 fn new(url: String) -> Self
17 where
18 Self: Sized;
19}
20
21pub trait Connectable {
22 fn set_client(&mut self);
23 fn get_connection(&self) -> &Client;
24 fn get_mut_connection(&mut self) -> &mut Client;
25}
26
27pub type PubSubProviderError = &'static str;
28
29pub type AsyncCallback<T> =
30 Box<dyn Fn(T) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> + Send + Sync>;
31
32pub trait PubSubProvider {
33 type Channels;
34 fn subscribe(
35 &self,
36 callback: AsyncCallback<ChannelMessage>,
37 channel: Self::Channels,
38 ) -> Pin<Box<dyn Future<Output = Result<(), &'static str>> + Send + '_>>;
39 fn publish(&mut self, channel: Self::Channels, message: ChannelMessage);
40}
41
42pub trait FifoProvider {
43 fn push(&self, channel_name: ListChannel, job_key: &str, job_payload: serde_json::Value);
44 fn pop(&self, channel_name: ListChannel, job_key: &str) -> Result<ListMessage, &'static str>;
45
46 fn get_list_name(&self, channel_name: &str, job_key: &str) -> String {
47 format!("{}:{}", channel_name, job_key)
48 }
49}
50
51pub trait CacheProvider {
52 fn get(&self, cache_space: CacheSpace, key: &str, app_name: Option<String>) -> String;
53 fn delete(&self, cache_space: CacheSpace, key: &str, app_name: Option<String>);
54 fn set(
55 &self,
56 cache_space: CacheSpace,
57 key: &str,
58 value: serde_json::Value,
59 app_name: Option<String>,
60 );
61 fn set_with_ttl(
63 &self,
64 cache_space: CacheSpace,
65 key: &str,
66 value: serde_json::Value,
67 app_name: Option<String>,
68 ttl_secs: u64,
69 ) {
70 self.set(cache_space, key, value, app_name);
71 let _ = ttl_secs;
72 }
73
74 fn get_key_name(&self, cache_space: CacheSpace, key: &str, app_name: Option<String>) -> String {
75 if app_name.is_none() {
76 return format!("{}:{}", cache_space.get_space(), key);
77 }
78
79 format!("{}:{}:{}", cache_space.get_space(), app_name.unwrap(), key)
80 }
81
82 fn get_value(
83 &self,
84 app_name: Option<String>,
85 value: serde_json::Value,
86 ) -> CacheValue<serde_json::Value> {
87 if app_name.is_none() {
88 return CacheValue {
89 meta: None,
90 value: Some(value),
91 };
92 }
93
94 CacheValue {
95 meta: Some(MessageMeta {
96 app_name: app_name.unwrap(),
97 }),
98 value: Some(value),
99 }
100 }
101}
102
103pub trait SortedSetProvider {
105 fn zadd(&self, key: &str, score: f64, member: &str) -> Result<u32, &'static str>;
107
108 fn zremrangebyscore(&self, key: &str, min: f64, max: f64) -> Result<u32, &'static str>;
111
112 fn zcard(&self, key: &str) -> Result<u32, &'static str>;
114
115 fn zrangebyscore_withscores(
118 &self,
119 key: &str,
120 min: f64,
121 max: f64,
122 limit: Option<usize>,
123 ) -> Result<Vec<(String, f64)>, &'static str>;
124
125 fn expire(&self, key: &str, ttl_secs: u64) -> Result<(), &'static str>;
127
128 fn eval_script(
130 &self,
131 script: &str,
132 keys: &[&str],
133 args: &[&str],
134 ) -> Result<Vec<i64>, &'static str>;
135}
136
137pub trait ConnectableProvider: ProviderTrait + Connectable {}
138impl<T> ConnectableProvider for T where T: ProviderTrait + Connectable {}
139
140pub trait ConnectablePubSubProvider: ConnectableProvider + PubSubProvider {}
141impl<T> ConnectablePubSubProvider for T where T: ConnectableProvider + PubSubProvider {}
142
143pub trait ConnectableFifoProvider: ConnectableProvider + FifoProvider {}
144impl<T> ConnectableFifoProvider for T where T: ConnectableProvider + FifoProvider {}
145
146pub trait ConnectableCacheProvider: ConnectableProvider + CacheProvider {}
147impl<T> ConnectableCacheProvider for T where T: ConnectableProvider + CacheProvider {}
148
149pub trait ConnectableSortedSetProvider: ConnectableProvider + SortedSetProvider {}
150impl<T> ConnectableSortedSetProvider for T where T: ConnectableProvider + SortedSetProvider {}
151
152pub enum Provider {
153 Redis,
154}
155
156pub enum ProviderEither {
157 Redis(Box<Redis>),
158}
159
160impl ProviderEither {
161 pub fn pub_sub(
162 self,
163 ) -> Result<
164 Box<dyn ConnectablePubSubProvider<Channels = SubscriptionChannel> + Send + Sync>,
165 &'static str,
166 > {
167 match self {
168 ProviderEither::Redis(provider) => Ok(provider
169 as Box<
170 dyn ConnectablePubSubProvider<Channels = SubscriptionChannel> + Send + Sync,
171 >),
172 }
173 }
174
175 pub fn connectable(self) -> Result<Box<dyn ConnectableProvider + Send + Sync>, &'static str> {
176 match self {
177 ProviderEither::Redis(provider) => {
178 Ok(provider as Box<dyn ConnectableProvider + Send + Sync>)
179 }
180 }
181 }
182
183 pub fn fifo(self) -> Result<Box<dyn ConnectableFifoProvider + Send + Sync>, &'static str> {
184 match self {
185 ProviderEither::Redis(provider) => {
186 Ok(provider as Box<dyn ConnectableFifoProvider + Send + Sync>)
187 }
188 }
189 }
190
191 pub fn cache(self) -> Result<Box<dyn ConnectableCacheProvider + Send + Sync>, &'static str> {
192 match self {
193 ProviderEither::Redis(provider) => {
194 Ok(provider as Box<dyn ConnectableCacheProvider + Send + Sync>)
195 }
196 }
197 }
198
199 pub fn sorted_set(
200 self,
201 ) -> Result<Box<dyn ConnectableSortedSetProvider + Send + Sync>, &'static str> {
202 match self {
203 ProviderEither::Redis(provider) => {
204 Ok(provider as Box<dyn ConnectableSortedSetProvider + Send + Sync>)
205 }
206 }
207 }
208}
209
210impl Provider {
211 pub(crate) fn create_provider(&self) -> ProviderEither {
212 match self {
213 Provider::Redis => {
214 dotenv().ok();
215 let url = env::var("FLIPPICO_CACHE_REDIS_URL");
216 if let Ok(url) = url {
217 let mut provider = Redis::new(url);
218 provider.set_client();
219 ProviderEither::Redis(Box::new(provider))
220 } else {
221 log::error!("FLIPPICO_CACHE_REDIS_URL is not set");
222 panic!("FLIPPICO_CACHE_REDIS_URL is not set");
223 }
224 }
225 }
226 }
227}