flippico_cache/
lib.rs

1use crate::types::provider::{Provider, ProviderEither};
2
3pub mod core;
4pub mod providers;
5pub mod types;
6
7pub struct Cache {
8    provider: ProviderEither,
9}
10
11impl Cache {
12    pub fn new() -> Self {
13        Self {
14            provider: Provider::Redis.create_provider(),
15        }
16    }
17
18    pub fn pub_sub(
19        self,
20    ) -> Result<
21        Box<
22            dyn types::provider::ConnectablePubSubProvider<
23                    Channels = types::channels::SubscriptionChannel,
24                > + Send
25                + Sync,
26        >,
27        &'static str,
28    > {
29        self.provider.pub_sub()
30    }
31
32    pub fn fifo(
33        self,
34    ) -> Result<Box<dyn types::provider::ConnectableFifoProvider + Send + Sync>, &'static str> {
35        self.provider.fifo()
36    }
37
38    pub fn cache(
39        self,
40    ) -> Result<Box<dyn types::provider::ConnectableCacheProvider + Send + Sync>, &'static str>
41    {
42        self.provider.cache()
43    }
44}
45
46#[cfg(test)]
47mod tests {
48    use crate::types::channels::{
49        CacheSpace, ChannelMessage, ListChannel, ListMessage, SubscriptionChannel,
50    };
51    use std::pin::Pin;
52
53    use super::*;
54
55    fn get_test_msg() -> ChannelMessage {
56        ChannelMessage {
57            channel: SubscriptionChannel::BajkomatApi,
58            meta: None,
59            body: None,
60        }
61    }
62
63    fn get_test_list_msg() -> ListMessage {
64        ListMessage {
65            meta: None,
66            body: serde_json::json!({"data": "test"}).into(),
67        }
68    }
69
70    #[test]
71    fn fifo_test() {
72        let cache = Cache::new().fifo().unwrap();
73        let clbk = move |msg: ListMessage| {
74            Box::pin(async move {
75                let msg2 = get_test_list_msg();
76                assert_eq!(msg, msg2);
77            }) as Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync>>
78        };
79        let _ = cache.pop(ListChannel::BajkomatApi, "job_key", Box::new(clbk));
80
81        cache.push(
82            ListChannel::BajkomatApi,
83            "job_key",
84            serde_json::json!({"data": "test"}),
85        );
86    }
87
88    #[test]
89    fn publish_test() {
90        let mut cache = Cache::new().pub_sub().unwrap();
91        cache.publish(SubscriptionChannel::BajkomatApi, get_test_msg());
92    }
93
94    #[test]
95    fn cache_test_set() {
96        let cache = Cache::new().cache().unwrap();
97        let test_data = serde_json::json!({"data": "test"});
98        cache.set(CacheSpace::Shopify, "SomeKey", test_data, None);
99        let value = cache.get(CacheSpace::Shopify, "SomeKey", None);
100        assert_eq!("{\"meta\":null,\"value\":{\"data\":\"test\"}}", value);
101    }
102
103    #[test]
104    fn cache_test_del() {
105        let cache = Cache::new().cache().unwrap();
106        let test_data = serde_json::json!({"data": "test"});
107        cache.set(CacheSpace::Shopify, "SomeKey", test_data, None);
108        let value = cache.get(CacheSpace::Shopify, "SomeKey", None);
109        assert_eq!("{\"meta\":null,\"value\":{\"data\":\"test\"}}", value);
110        cache.delete(CacheSpace::Shopify, "SomeKey", None);
111        let value = cache.get(CacheSpace::Shopify, "SomeKey", None);
112        assert_eq!("", value);
113    }
114
115    #[tokio::test]
116    async fn subscribe_test() {
117        let subscriber_cache = Cache::new().pub_sub().unwrap();
118        let mut publisher_cache = Cache::new().pub_sub().unwrap();
119
120        // Use a channel to signal when the callback is executed
121        let (tx, rx) = tokio::sync::oneshot::channel();
122        let tx = std::sync::Arc::new(std::sync::Mutex::new(Some(tx)));
123
124        let sub_clbk = move |msg: ChannelMessage| {
125            let tx_clone = tx.clone();
126            Box::pin(async move {
127                let msg2 = get_test_msg();
128                assert_eq!(msg, msg2);
129                if let Ok(mut tx_guard) = tx_clone.lock() {
130                    if let Some(sender) = tx_guard.take() {
131                        let _ = sender.send(());
132                    }
133                }
134            }) as Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync>>
135        };
136
137        // Start the subscription task by moving the cache into the spawned task
138        let _handle = tokio::spawn(async move {
139            let subscription_task =
140                subscriber_cache.subscribe(Box::new(sub_clbk), SubscriptionChannel::BajkomatApi);
141            let _ = subscription_task.await;
142        });
143
144        // Wait a moment for subscription to be established
145        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
146
147        // Publish the message
148        publisher_cache.publish(SubscriptionChannel::BajkomatApi, get_test_msg());
149
150        // Wait for the callback to be executed with timeout
151        let timeout_result = tokio::time::timeout(tokio::time::Duration::from_secs(10), rx).await;
152        match timeout_result {
153            Ok(Ok(())) => {
154                // Test passed
155            }
156            Ok(Err(e)) => {
157                panic!("Callback channel error: {:?}", e);
158            }
159            Err(_error) => {
160                // Check if Redis is available - if not, skip the test
161                println!("Test timed out - Redis may not be available, skipping test");
162                return;
163            }
164        }
165    }
166}