Skip to main content

flippico_cache/types/
provider.rs

1use redis::Client;
2
3use crate::{
4    providers::redis::Redis,
5    types::channels::{
6        CacheSpace, CacheValue, ChannelMessage, ListChannel, ListMessage, MessageMeta,
7        SubscriptionChannel,
8    },
9};
10use dotenv::dotenv;
11use std::env;
12use std::future::Future;
13use std::pin::Pin;
14
15pub trait ProviderTrait {
16    fn new(url: String) -> Self
17    where
18        Self: Sized;
19}
20
21pub trait Connectable {
22    fn set_client(&mut self);
23    fn get_connection(&self) -> &Client;
24    fn get_mut_connection(&mut self) -> &mut Client;
25}
26
27pub type PubSubProviderError = &'static str;
28
29pub type AsyncCallback<T> =
30    Box<dyn Fn(T) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> + Send + Sync>;
31
32pub trait PubSubProvider {
33    type Channels;
34    fn subscribe(
35        &self,
36        callback: AsyncCallback<ChannelMessage>,
37        channel: Self::Channels,
38    ) -> Pin<Box<dyn Future<Output = Result<(), &'static str>> + Send + '_>>;
39    fn publish(&mut self, channel: Self::Channels, message: ChannelMessage);
40}
41
42pub trait FifoProvider {
43    fn push(&self, channel_name: ListChannel, job_key: &str, job_payload: serde_json::Value);
44    fn pop(&self, channel_name: ListChannel, job_key: &str) -> Result<ListMessage, &'static str>;
45
46    fn get_list_name(&self, channel_name: &str, job_key: &str) -> String {
47        format!("{}:{}", channel_name, job_key)
48    }
49}
50
51pub trait CacheProvider {
52    fn get(&self, cache_space: CacheSpace, key: &str, app_name: Option<String>) -> String;
53    fn delete(&self, cache_space: CacheSpace, key: &str, app_name: Option<String>);
54    fn set(
55        &self,
56        cache_space: CacheSpace,
57        key: &str,
58        value: serde_json::Value,
59        app_name: Option<String>,
60    );
61    /// Set a key with an explicit TTL in seconds. Defaults to `set` (no expiry) if not overridden.
62    fn set_with_ttl(
63        &self,
64        cache_space: CacheSpace,
65        key: &str,
66        value: serde_json::Value,
67        app_name: Option<String>,
68        ttl_secs: u64,
69    ) {
70        self.set(cache_space, key, value, app_name);
71        let _ = ttl_secs;
72    }
73
74    fn get_key_name(&self, cache_space: CacheSpace, key: &str, app_name: Option<String>) -> String {
75        if app_name.is_none() {
76            return format!("{}:{}", cache_space.get_space(), key);
77        }
78
79        format!("{}:{}:{}", cache_space.get_space(), app_name.unwrap(), key)
80    }
81
82    fn get_value(
83        &self,
84        app_name: Option<String>,
85        value: serde_json::Value,
86    ) -> CacheValue<serde_json::Value> {
87        if app_name.is_none() {
88            return CacheValue {
89                meta: None,
90                value: Some(value),
91            };
92        }
93
94        CacheValue {
95            meta: Some(MessageMeta {
96                app_name: app_name.unwrap(),
97            }),
98            value: Some(value),
99        }
100    }
101}
102
103pub trait ConnectableProvider: ProviderTrait + Connectable {}
104impl<T> ConnectableProvider for T where T: ProviderTrait + Connectable {}
105
106pub trait ConnectablePubSubProvider: ConnectableProvider + PubSubProvider {}
107impl<T> ConnectablePubSubProvider for T where T: ConnectableProvider + PubSubProvider {}
108
109pub trait ConnectableFifoProvider: ConnectableProvider + FifoProvider {}
110impl<T> ConnectableFifoProvider for T where T: ConnectableProvider + FifoProvider {}
111
112pub trait ConnectableCacheProvider: ConnectableProvider + CacheProvider {}
113impl<T> ConnectableCacheProvider for T where T: ConnectableProvider + CacheProvider {}
114
115pub enum Provider {
116    Redis,
117}
118
119pub enum ProviderEither {
120    Redis(Box<Redis>),
121}
122
123impl ProviderEither {
124    pub fn pub_sub(
125        self,
126    ) -> Result<
127        Box<dyn ConnectablePubSubProvider<Channels = SubscriptionChannel> + Send + Sync>,
128        &'static str,
129    > {
130        match self {
131            ProviderEither::Redis(provider) => Ok(provider
132                as Box<
133                    dyn ConnectablePubSubProvider<Channels = SubscriptionChannel> + Send + Sync,
134                >),
135        }
136    }
137
138    pub fn connectable(self) -> Result<Box<dyn ConnectableProvider + Send + Sync>, &'static str> {
139        match self {
140            ProviderEither::Redis(provider) => {
141                Ok(provider as Box<dyn ConnectableProvider + Send + Sync>)
142            }
143        }
144    }
145
146    pub fn fifo(self) -> Result<Box<dyn ConnectableFifoProvider + Send + Sync>, &'static str> {
147        match self {
148            ProviderEither::Redis(provider) => {
149                Ok(provider as Box<dyn ConnectableFifoProvider + Send + Sync>)
150            }
151        }
152    }
153
154    pub fn cache(self) -> Result<Box<dyn ConnectableCacheProvider + Send + Sync>, &'static str> {
155        match self {
156            ProviderEither::Redis(provider) => {
157                Ok(provider as Box<dyn ConnectableCacheProvider + Send + Sync>)
158            }
159        }
160    }
161}
162
163impl Provider {
164    pub(crate) fn create_provider(&self) -> ProviderEither {
165        match self {
166            Provider::Redis => {
167                dotenv().ok();
168                let url = env::var("FLIPPICO_CACHE_REDIS_URL");
169                if let Ok(url) = url {
170                    let mut provider = Redis::new(url);
171                    provider.set_client();
172                    ProviderEither::Redis(Box::new(provider))
173                } else {
174                    log::error!("FLIPPICO_CACHE_REDIS_URL is not set");
175                    panic!("FLIPPICO_CACHE_REDIS_URL is not set");
176                }
177            }
178        }
179    }
180}