flippico_cache/types/
provider.rs

1use redis::Client;
2
3use crate::{
4    providers::redis::Redis,
5    types::channels::{
6        CacheSpace, CacheValue, ChannelMessage, ListChannel, ListMessage, MessageMeta,
7        SubscriptionChannel,
8    },
9};
10use dotenv::dotenv;
11use std::env;
12use std::future::Future;
13use std::pin::Pin;
14
15pub trait ProviderTrait {
16    fn new(url: String) -> Self
17    where
18        Self: Sized;
19}
20
21pub trait Connectable {
22    fn set_client(&mut self);
23    fn get_connection(&self) -> &Client;
24    fn get_mut_connection(&mut self) -> &mut Client;
25}
26
27pub type PubSubProviderError = &'static str;
28
29pub type AsyncCallback<T> =
30    Box<dyn Fn(T) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> + Send + Sync>;
31
32pub trait PubSubProvider {
33    type Channels;
34    fn subscribe(
35        &self,
36        callback: AsyncCallback<ChannelMessage>,
37        channel: Self::Channels,
38    ) -> Pin<Box<dyn Future<Output = Result<(), &'static str>> + Send + '_>>;
39    fn publish(&mut self, channel: Self::Channels, message: ChannelMessage);
40}
41
42pub trait FifoProvider {
43    fn push(&self, channel_name: ListChannel, job_key: &str, job_payload: serde_json::Value);
44    fn pop(&self, channel_name: ListChannel, job_key: &str) -> Result<ListMessage, &'static str>;
45
46    fn get_list_name(&self, channel_name: &str, job_key: &str) -> String {
47        format!("{}:{}", channel_name, job_key)
48    }
49}
50
51pub trait CacheProvider {
52    fn get(&self, cache_space: CacheSpace, key: &str, app_name: Option<String>) -> String;
53    fn delete(&self, cache_space: CacheSpace, key: &str, app_name: Option<String>);
54    fn set(
55        &self,
56        cache_space: CacheSpace,
57        key: &str,
58        value: serde_json::Value,
59        app_name: Option<String>,
60    );
61
62    fn get_key_name(&self, cache_space: CacheSpace, key: &str, app_name: Option<String>) -> String {
63        if app_name.is_none() {
64            return format!("{}:{}", cache_space.get_space(), key);
65        }
66
67        format!("{}:{}:{}", cache_space.get_space(), app_name.unwrap(), key)
68    }
69
70    fn get_value(
71        &self,
72        app_name: Option<String>,
73        value: serde_json::Value,
74    ) -> CacheValue<serde_json::Value> {
75        if app_name.is_none() {
76            return CacheValue {
77                meta: None,
78                value: Some(value),
79            };
80        }
81
82        CacheValue {
83            meta: Some(MessageMeta {
84                app_name: app_name.unwrap(),
85            }),
86            value: Some(value),
87        }
88    }
89}
90
91pub trait ConnectableProvider: ProviderTrait + Connectable {}
92impl<T> ConnectableProvider for T where T: ProviderTrait + Connectable {}
93
94pub trait ConnectablePubSubProvider: ConnectableProvider + PubSubProvider {}
95impl<T> ConnectablePubSubProvider for T where T: ConnectableProvider + PubSubProvider {}
96
97pub trait ConnectableFifoProvider: ConnectableProvider + FifoProvider {}
98impl<T> ConnectableFifoProvider for T where T: ConnectableProvider + FifoProvider {}
99
100pub trait ConnectableCacheProvider: ConnectableProvider + CacheProvider {}
101impl<T> ConnectableCacheProvider for T where T: ConnectableProvider + CacheProvider {}
102
103pub enum Provider {
104    Redis,
105}
106
107pub enum ProviderEither {
108    Redis(Box<Redis>),
109}
110
111impl ProviderEither {
112    pub fn pub_sub(
113        self,
114    ) -> Result<
115        Box<dyn ConnectablePubSubProvider<Channels = SubscriptionChannel> + Send + Sync>,
116        &'static str,
117    > {
118        match self {
119            ProviderEither::Redis(provider) => Ok(provider
120                as Box<
121                    dyn ConnectablePubSubProvider<Channels = SubscriptionChannel> + Send + Sync,
122                >),
123        }
124    }
125
126    pub fn connectable(self) -> Result<Box<dyn ConnectableProvider + Send + Sync>, &'static str> {
127        match self {
128            ProviderEither::Redis(provider) => {
129                Ok(provider as Box<dyn ConnectableProvider + Send + Sync>)
130            }
131        }
132    }
133
134    pub fn fifo(self) -> Result<Box<dyn ConnectableFifoProvider + Send + Sync>, &'static str> {
135        match self {
136            ProviderEither::Redis(provider) => {
137                Ok(provider as Box<dyn ConnectableFifoProvider + Send + Sync>)
138            }
139        }
140    }
141
142    pub fn cache(self) -> Result<Box<dyn ConnectableCacheProvider + Send + Sync>, &'static str> {
143        match self {
144            ProviderEither::Redis(provider) => {
145                Ok(provider as Box<dyn ConnectableCacheProvider + Send + Sync>)
146            }
147        }
148    }
149}
150
151impl Provider {
152    pub(crate) fn create_provider(&self) -> ProviderEither {
153        match self {
154            Provider::Redis => {
155                dotenv().ok();
156                let url = env::var("FLIPPICO_CACHE_REDIS_URL");
157                if let Ok(url) = url {
158                    let mut provider = Redis::new(url);
159                    provider.set_client();
160                    ProviderEither::Redis(Box::new(provider))
161                } else {
162                    log::error!("FLIPPICO_CACHE_REDIS_URL is not set");
163                    panic!("FLIPPICO_CACHE_REDIS_URL is not set");
164                }
165            }
166        }
167    }
168}