Skip to main content

flippico_cache/types/
provider.rs

1use redis::Client;
2
3use crate::{
4    providers::redis::Redis,
5    types::channels::{
6        CacheSpace, CacheValue, ChannelMessage, ListChannel, ListMessage, MessageMeta,
7        SubscriptionChannel,
8    },
9};
10use dotenv::dotenv;
11use std::env;
12use std::future::Future;
13use std::pin::Pin;
14
15pub trait ProviderTrait {
16    fn new(url: String) -> Self
17    where
18        Self: Sized;
19}
20
21pub trait Connectable {
22    fn set_client(&mut self);
23    fn get_connection(&self) -> &Client;
24    fn get_mut_connection(&mut self) -> &mut Client;
25}
26
27pub type PubSubProviderError = &'static str;
28
29pub type AsyncCallback<T> =
30    Box<dyn Fn(T) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> + Send + Sync>;
31
32pub trait PubSubProvider {
33    type Channels;
34    fn subscribe(
35        &self,
36        callback: AsyncCallback<ChannelMessage>,
37        channel: Self::Channels,
38    ) -> Pin<Box<dyn Future<Output = Result<(), &'static str>> + Send + '_>>;
39    fn publish(&mut self, channel: Self::Channels, message: ChannelMessage);
40}
41
42pub trait FifoProvider {
43    fn push(&self, channel_name: ListChannel, job_key: &str, job_payload: serde_json::Value);
44    fn pop(&self, channel_name: ListChannel, job_key: &str) -> Result<ListMessage, &'static str>;
45
46    fn get_list_name(&self, channel_name: &str, job_key: &str) -> String {
47        format!("{}:{}", channel_name, job_key)
48    }
49}
50
51pub trait CacheProvider {
52    fn get(&self, cache_space: CacheSpace, key: &str, app_name: Option<String>) -> String;
53    fn delete(&self, cache_space: CacheSpace, key: &str, app_name: Option<String>);
54    fn set(
55        &self,
56        cache_space: CacheSpace,
57        key: &str,
58        value: serde_json::Value,
59        app_name: Option<String>,
60    );
61    /// Set a key with an explicit TTL in seconds. Defaults to `set` (no expiry) if not overridden.
62    fn set_with_ttl(
63        &self,
64        cache_space: CacheSpace,
65        key: &str,
66        value: serde_json::Value,
67        app_name: Option<String>,
68        ttl_secs: u64,
69    ) {
70        self.set(cache_space, key, value, app_name);
71        let _ = ttl_secs;
72    }
73
74    fn get_key_name(&self, cache_space: CacheSpace, key: &str, app_name: Option<String>) -> String {
75        if app_name.is_none() {
76            return format!("{}:{}", cache_space.get_space(), key);
77        }
78
79        format!("{}:{}:{}", cache_space.get_space(), app_name.unwrap(), key)
80    }
81
82    fn get_value(
83        &self,
84        app_name: Option<String>,
85        value: serde_json::Value,
86    ) -> CacheValue<serde_json::Value> {
87        if app_name.is_none() {
88            return CacheValue {
89                meta: None,
90                value: Some(value),
91            };
92        }
93
94        CacheValue {
95            meta: Some(MessageMeta {
96                app_name: app_name.unwrap(),
97            }),
98            value: Some(value),
99        }
100    }
101}
102
103/// Sorted set operations for rate limiting, leaderboards, etc.
104pub trait SortedSetProvider {
105    /// Add a member with the given score. Returns the number of new elements added.
106    fn zadd(&self, key: &str, score: f64, member: &str) -> Result<u32, &'static str>;
107
108    /// Remove all members with scores between min and max (inclusive).
109    /// Returns the number of removed members.
110    fn zremrangebyscore(&self, key: &str, min: f64, max: f64) -> Result<u32, &'static str>;
111
112    /// Return the number of members in the sorted set.
113    fn zcard(&self, key: &str) -> Result<u32, &'static str>;
114
115    /// Return members with scores in the given range, ordered by score ascending.
116    /// Includes scores in the result as `(member, score)` pairs.
117    fn zrangebyscore_withscores(
118        &self,
119        key: &str,
120        min: f64,
121        max: f64,
122        limit: Option<usize>,
123    ) -> Result<Vec<(String, f64)>, &'static str>;
124
125    /// Set a TTL (in seconds) on the key. Useful for auto-cleanup of sliding windows.
126    fn expire(&self, key: &str, ttl_secs: u64) -> Result<(), &'static str>;
127
128    /// Execute a Lua script with the given keys and args. Returns a vector of integers.
129    fn eval_script(
130        &self,
131        script: &str,
132        keys: &[&str],
133        args: &[&str],
134    ) -> Result<Vec<i64>, &'static str>;
135}
136
137pub trait ConnectableProvider: ProviderTrait + Connectable {}
138impl<T> ConnectableProvider for T where T: ProviderTrait + Connectable {}
139
140pub trait ConnectablePubSubProvider: ConnectableProvider + PubSubProvider {}
141impl<T> ConnectablePubSubProvider for T where T: ConnectableProvider + PubSubProvider {}
142
143pub trait ConnectableFifoProvider: ConnectableProvider + FifoProvider {}
144impl<T> ConnectableFifoProvider for T where T: ConnectableProvider + FifoProvider {}
145
146pub trait ConnectableCacheProvider: ConnectableProvider + CacheProvider {}
147impl<T> ConnectableCacheProvider for T where T: ConnectableProvider + CacheProvider {}
148
149pub trait ConnectableSortedSetProvider: ConnectableProvider + SortedSetProvider {}
150impl<T> ConnectableSortedSetProvider for T where T: ConnectableProvider + SortedSetProvider {}
151
152pub enum Provider {
153    Redis,
154}
155
156pub enum ProviderEither {
157    Redis(Box<Redis>),
158}
159
160impl ProviderEither {
161    pub fn pub_sub(
162        self,
163    ) -> Result<
164        Box<dyn ConnectablePubSubProvider<Channels = SubscriptionChannel> + Send + Sync>,
165        &'static str,
166    > {
167        match self {
168            ProviderEither::Redis(provider) => Ok(provider
169                as Box<
170                    dyn ConnectablePubSubProvider<Channels = SubscriptionChannel> + Send + Sync,
171                >),
172        }
173    }
174
175    pub fn connectable(self) -> Result<Box<dyn ConnectableProvider + Send + Sync>, &'static str> {
176        match self {
177            ProviderEither::Redis(provider) => {
178                Ok(provider as Box<dyn ConnectableProvider + Send + Sync>)
179            }
180        }
181    }
182
183    pub fn fifo(self) -> Result<Box<dyn ConnectableFifoProvider + Send + Sync>, &'static str> {
184        match self {
185            ProviderEither::Redis(provider) => {
186                Ok(provider as Box<dyn ConnectableFifoProvider + Send + Sync>)
187            }
188        }
189    }
190
191    pub fn cache(self) -> Result<Box<dyn ConnectableCacheProvider + Send + Sync>, &'static str> {
192        match self {
193            ProviderEither::Redis(provider) => {
194                Ok(provider as Box<dyn ConnectableCacheProvider + Send + Sync>)
195            }
196        }
197    }
198
199    pub fn sorted_set(
200        self,
201    ) -> Result<Box<dyn ConnectableSortedSetProvider + Send + Sync>, &'static str> {
202        match self {
203            ProviderEither::Redis(provider) => {
204                Ok(provider as Box<dyn ConnectableSortedSetProvider + Send + Sync>)
205            }
206        }
207    }
208}
209
210impl Provider {
211    pub(crate) fn create_provider(&self) -> ProviderEither {
212        match self {
213            Provider::Redis => {
214                dotenv().ok();
215                let url = env::var("FLIPPICO_CACHE_REDIS_URL");
216                if let Ok(url) = url {
217                    let mut provider = Redis::new(url);
218                    provider.set_client();
219                    ProviderEither::Redis(Box::new(provider))
220                } else {
221                    log::error!("FLIPPICO_CACHE_REDIS_URL is not set");
222                    panic!("FLIPPICO_CACHE_REDIS_URL is not set");
223                }
224            }
225        }
226    }
227}