flippico_cache/lib.rs
1use crate::types::provider::{Provider, ProviderEither};
2
3pub mod core;
4pub mod providers;
5pub mod types;
6
7pub struct Cache {
8 provider: ProviderEither,
9}
10
11impl Cache {
12 pub fn new() -> Self {
13 Self {
14 provider: Provider::Redis.create_provider(),
15 }
16 }
17
18 pub fn pub_sub(
19 self,
20 ) -> Result<
21 Box<
22 dyn types::provider::ConnectablePubSubProvider<
23 Channels = types::channels::SubscriptionChannel,
24 > + Send
25 + Sync,
26 >,
27 &'static str,
28 > {
29 self.provider.pub_sub()
30 }
31
32 pub fn fifo(
33 self,
34 ) -> Result<Box<dyn types::provider::ConnectableFifoProvider + Send + Sync>, &'static str> {
35 self.provider.fifo()
36 }
37
38 pub fn cache(
39 self,
40 ) -> Result<Box<dyn types::provider::ConnectableCacheProvider + Send + Sync>, &'static str>
41 {
42 self.provider.cache()
43 }
44}
45
46#[cfg(test)]
47mod tests {
48 use crate::types::channels::{
49 CacheSpace, ChannelMessage, ListChannel, ListMessage, SubscriptionChannel,
50 };
51 use std::pin::Pin;
52 use std::sync::Arc;
53
54 use super::*;
55
56 fn get_test_msg() -> ChannelMessage {
57 ChannelMessage {
58 channel: SubscriptionChannel::BajkomatApi,
59 meta: None,
60 body: None,
61 }
62 }
63
64 fn get_test_list_msg() -> ListMessage {
65 ListMessage {
66 meta: None,
67 body: serde_json::json!({"body": "test"}).into(),
68 }
69 }
70
71 #[tokio::test]
72 async fn fifo_test() {
73 let cache = Arc::new(Cache::new().fifo().unwrap());
74 let cache_clone = cache.clone();
75 let clbk = move |msg: ListMessage| {
76 Box::pin(async move {
77 let msg2 = get_test_list_msg();
78 assert_eq!(msg, msg2);
79 }) as Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync>>
80 };
81 tokio::spawn(async move {
82 let _ = cache_clone
83 .pop(ListChannel::BajkomatApi, "job_key", Box::new(clbk))
84 .await;
85 });
86 cache.push(
87 ListChannel::BajkomatApi,
88 "job_key",
89 // serde_json::json!(get_test_list_msg()),
90 serde_json::json!({"body": "test"}),
91 );
92 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
93 }
94
95 #[test]
96 fn publish_test() {
97 let mut cache = Cache::new().pub_sub().unwrap();
98 cache.publish(SubscriptionChannel::BajkomatApi, get_test_msg());
99 }
100
101 #[test]
102 fn cache_test_set() {
103 let cache = Cache::new().cache().unwrap();
104 let test_data = serde_json::json!({"data": "test"});
105 cache.set(CacheSpace::Shopify, "SomeKey", test_data, None);
106 let value = cache.get(CacheSpace::Shopify, "SomeKey", None);
107 assert_eq!("{\"meta\":null,\"value\":{\"data\":\"test\"}}", value);
108 }
109
110 #[test]
111 fn cache_test_del() {
112 let cache = Cache::new().cache().unwrap();
113 let test_data = serde_json::json!({"data": "test"});
114 cache.set(CacheSpace::Shopify, "SomeKey", test_data, None);
115 let value = cache.get(CacheSpace::Shopify, "SomeKey", None);
116 assert_eq!("{\"meta\":null,\"value\":{\"data\":\"test\"}}", value);
117 cache.delete(CacheSpace::Shopify, "SomeKey", None);
118 let value = cache.get(CacheSpace::Shopify, "SomeKey", None);
119 assert_eq!("", value);
120 }
121
122 // #[tokio::test]
123 // async fn subscribe_test() {
124 // let subscriber_cache = Cache::new().pub_sub().unwrap();
125 // let mut publisher_cache = Cache::new().pub_sub().unwrap();
126
127 // // Use a channel to signal when the callback is executed
128 // let (tx, rx) = tokio::sync::oneshot::channel();
129 // let tx = std::sync::Arc::new(std::sync::Mutex::new(Some(tx)));
130
131 // let sub_clbk = move |msg: ChannelMessage| {
132 // let tx_clone = tx.clone();
133 // Box::pin(async move {
134 // let msg2 = get_test_msg();
135 // assert_eq!(msg, msg2);
136 // if let Ok(mut tx_guard) = tx_clone.lock() {
137 // if let Some(sender) = tx_guard.take() {
138 // let _ = sender.send(());
139 // }
140 // }
141 // }) as Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync>>
142 // };
143
144 // // Start the subscription task by moving the cache into the spawned task
145 // let _handle = tokio::spawn(async move {
146 // let subscription_task =
147 // subscriber_cache.subscribe(Box::new(sub_clbk), SubscriptionChannel::BajkomatApi);
148 // let _ = subscription_task.await;
149 // });
150
151 // // Wait a moment for subscription to be established
152 // tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
153
154 // // Publish the message
155 // publisher_cache.publish(SubscriptionChannel::BajkomatApi, get_test_msg());
156
157 // // Wait for the callback to be executed with timeout
158 // let timeout_result = tokio::time::timeout(tokio::time::Duration::from_secs(10), rx).await;
159 // match timeout_result {
160 // Ok(Ok(())) => {
161 // // Test passed
162 // }
163 // Ok(Err(e)) => {
164 // panic!("Callback channel error: {:?}", e);
165 // }
166 // Err(_error) => {
167 // // Check if Redis is available - if not, skip the test
168 // println!("Test timed out - Redis may not be available, skipping test");
169 // return;
170 // }
171 // }
172 // }
173}