flippico_cache/
lib.rs

1use crate::types::provider::{Provider, ProviderEither};
2
3pub mod core;
4pub mod providers;
5pub mod types;
6
7pub struct Cache {
8    provider: ProviderEither,
9}
10
11impl Cache {
12    pub fn new() -> Self {
13        Self {
14            provider: Provider::Redis.create_provider(),
15        }
16    }
17
18    pub fn pub_sub(
19        self,
20    ) -> Result<
21        Box<
22            dyn types::provider::ConnectablePubSubProvider<
23                    Channels = types::channels::SubscriptionChannel,
24                > + Send
25                + Sync,
26        >,
27        &'static str,
28    > {
29        self.provider.pub_sub()
30    }
31
32    pub fn fifo(
33        self,
34    ) -> Result<Box<dyn types::provider::ConnectableFifoProvider + Send + Sync>, &'static str> {
35        self.provider.fifo()
36    }
37
38    pub fn cache(
39        self,
40    ) -> Result<Box<dyn types::provider::ConnectableCacheProvider + Send + Sync>, &'static str>
41    {
42        self.provider.cache()
43    }
44}
45
46#[cfg(test)]
47mod tests {
48    use crate::types::channels::{
49        CacheSpace, ChannelMessage, ListChannel, ListMessage, SubscriptionChannel,
50    };
51    use std::pin::Pin;
52    use std::sync::Arc;
53
54    use super::*;
55
56    fn get_test_msg() -> ChannelMessage {
57        ChannelMessage {
58            channel: SubscriptionChannel::BajkomatApi,
59            meta: None,
60            body: None,
61        }
62    }
63
64    fn get_test_list_msg() -> ListMessage {
65        ListMessage {
66            meta: None,
67            body: serde_json::json!({"body": "test"}).into(),
68        }
69    }
70
71    #[tokio::test]
72    async fn fifo_test() {
73        let cache = Arc::new(Cache::new().fifo().unwrap());
74        let cache_clone = cache.clone();
75        let clbk = move |msg: ListMessage| {
76            Box::pin(async move {
77                let msg2 = get_test_list_msg();
78                assert_eq!(msg, msg2);
79            }) as Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync>>
80        };
81        tokio::spawn(async move {
82            let _ = cache_clone
83                .pop(ListChannel::BajkomatApi, "job_key", Box::new(clbk))
84                .await;
85        });
86        cache.push(
87            ListChannel::BajkomatApi,
88            "job_key",
89            // serde_json::json!(get_test_list_msg()),
90            serde_json::json!({"body": "test"}),
91        );
92        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
93    }
94
95    #[test]
96    fn publish_test() {
97        let mut cache = Cache::new().pub_sub().unwrap();
98        cache.publish(SubscriptionChannel::BajkomatApi, get_test_msg());
99    }
100
101    #[test]
102    fn cache_test_set() {
103        let cache = Cache::new().cache().unwrap();
104        let test_data = serde_json::json!({"data": "test"});
105        cache.set(CacheSpace::Shopify, "SomeKey", test_data, None);
106        let value = cache.get(CacheSpace::Shopify, "SomeKey", None);
107        assert_eq!("{\"meta\":null,\"value\":{\"data\":\"test\"}}", value);
108    }
109
110    #[test]
111    fn cache_test_del() {
112        let cache = Cache::new().cache().unwrap();
113        let test_data = serde_json::json!({"data": "test"});
114        cache.set(CacheSpace::Shopify, "SomeKey", test_data, None);
115        let value = cache.get(CacheSpace::Shopify, "SomeKey", None);
116        assert_eq!("{\"meta\":null,\"value\":{\"data\":\"test\"}}", value);
117        cache.delete(CacheSpace::Shopify, "SomeKey", None);
118        let value = cache.get(CacheSpace::Shopify, "SomeKey", None);
119        assert_eq!("", value);
120    }
121
122    // #[tokio::test]
123    // async fn subscribe_test() {
124    //     let subscriber_cache = Cache::new().pub_sub().unwrap();
125    //     let mut publisher_cache = Cache::new().pub_sub().unwrap();
126
127    //     // Use a channel to signal when the callback is executed
128    //     let (tx, rx) = tokio::sync::oneshot::channel();
129    //     let tx = std::sync::Arc::new(std::sync::Mutex::new(Some(tx)));
130
131    //     let sub_clbk = move |msg: ChannelMessage| {
132    //         let tx_clone = tx.clone();
133    //         Box::pin(async move {
134    //             let msg2 = get_test_msg();
135    //             assert_eq!(msg, msg2);
136    //             if let Ok(mut tx_guard) = tx_clone.lock() {
137    //                 if let Some(sender) = tx_guard.take() {
138    //                     let _ = sender.send(());
139    //                 }
140    //             }
141    //         }) as Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync>>
142    //     };
143
144    //     // Start the subscription task by moving the cache into the spawned task
145    //     let _handle = tokio::spawn(async move {
146    //         let subscription_task =
147    //             subscriber_cache.subscribe(Box::new(sub_clbk), SubscriptionChannel::BajkomatApi);
148    //         let _ = subscription_task.await;
149    //     });
150
151    //     // Wait a moment for subscription to be established
152    //     tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
153
154    //     // Publish the message
155    //     publisher_cache.publish(SubscriptionChannel::BajkomatApi, get_test_msg());
156
157    //     // Wait for the callback to be executed with timeout
158    //     let timeout_result = tokio::time::timeout(tokio::time::Duration::from_secs(10), rx).await;
159    //     match timeout_result {
160    //         Ok(Ok(())) => {
161    //             // Test passed
162    //         }
163    //         Ok(Err(e)) => {
164    //             panic!("Callback channel error: {:?}", e);
165    //         }
166    //         Err(_error) => {
167    //             // Check if Redis is available - if not, skip the test
168    //             println!("Test timed out - Redis may not be available, skipping test");
169    //             return;
170    //         }
171    //     }
172    // }
173}