1use crate::types::provider::{Provider, ProviderEither};
2
3pub mod core;
4pub mod providers;
5pub mod types;
6
7pub struct Cache {
8 provider: ProviderEither,
9}
10
11impl Cache {
12 pub fn new() -> Self {
13 Self {
14 provider: Provider::Redis.create_provider(),
15 }
16 }
17
18 pub fn pub_sub(
19 self,
20 ) -> Result<
21 Box<
22 dyn types::provider::ConnectablePubSubProvider<
23 Channels = types::channels::SubscriptionChannel,
24 > + Send
25 + Sync,
26 >,
27 &'static str,
28 > {
29 self.provider.pub_sub()
30 }
31
32 pub fn fifo(
33 self,
34 ) -> Result<Box<dyn types::provider::ConnectableFifoProvider + Send + Sync>, &'static str> {
35 self.provider.fifo()
36 }
37
38 pub fn cache(
39 self,
40 ) -> Result<Box<dyn types::provider::ConnectableCacheProvider + Send + Sync>, &'static str>
41 {
42 self.provider.cache()
43 }
44}
45
46#[cfg(test)]
47mod tests {
48 use crate::types::channels::{
49 CacheSpace, ChannelMessage, ListChannel, ListMessage, SubscriptionChannel,
50 };
51 use std::pin::Pin;
52
53 use super::*;
54
55 fn get_test_msg() -> ChannelMessage {
56 ChannelMessage {
57 channel: SubscriptionChannel::BajkomatApi,
58 meta: None,
59 body: None,
60 }
61 }
62
63 fn get_test_list_msg() -> ListMessage {
64 ListMessage {
65 meta: None,
66 body: serde_json::json!({"data": "test"}).into(),
67 }
68 }
69
70 #[test]
71 fn fifo_test() {
72 let cache = Cache::new().fifo().unwrap();
73 let clbk = move |msg: ListMessage| {
74 Box::pin(async move {
75 let msg2 = get_test_list_msg();
76 assert_eq!(msg, msg2);
77 }) as Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync>>
78 };
79 let _ = cache.pop(ListChannel::BajkomatApi, "job_key", Box::new(clbk));
80
81 cache.push(
82 ListChannel::BajkomatApi,
83 "job_key",
84 serde_json::json!({"data": "test"}),
85 );
86 }
87
88 #[test]
89 fn publish_test() {
90 let mut cache = Cache::new().pub_sub().unwrap();
91 cache.publish(SubscriptionChannel::BajkomatApi, get_test_msg());
92 }
93
94 #[test]
95 fn cache_test_set() {
96 let cache = Cache::new().cache().unwrap();
97 let test_data = serde_json::json!({"data": "test"});
98 cache.set(CacheSpace::Shopify, "SomeKey", test_data, None);
99 let value = cache.get(CacheSpace::Shopify, "SomeKey", None);
100 assert_eq!("{\"meta\":null,\"value\":{\"data\":\"test\"}}", value);
101 }
102
103 #[test]
104 fn cache_test_del() {
105 let cache = Cache::new().cache().unwrap();
106 let test_data = serde_json::json!({"data": "test"});
107 cache.set(CacheSpace::Shopify, "SomeKey", test_data, None);
108 let value = cache.get(CacheSpace::Shopify, "SomeKey", None);
109 assert_eq!("{\"meta\":null,\"value\":{\"data\":\"test\"}}", value);
110 cache.delete(CacheSpace::Shopify, "SomeKey", None);
111 let value = cache.get(CacheSpace::Shopify, "SomeKey", None);
112 assert_eq!("", value);
113 }
114
115 #[tokio::test]
116 async fn subscribe_test() {
117 let subscriber_cache = Cache::new().pub_sub().unwrap();
118 let mut publisher_cache = Cache::new().pub_sub().unwrap();
119
120 let (tx, rx) = tokio::sync::oneshot::channel();
122 let tx = std::sync::Arc::new(std::sync::Mutex::new(Some(tx)));
123
124 let sub_clbk = move |msg: ChannelMessage| {
125 let tx_clone = tx.clone();
126 Box::pin(async move {
127 let msg2 = get_test_msg();
128 assert_eq!(msg, msg2);
129 if let Ok(mut tx_guard) = tx_clone.lock() {
130 if let Some(sender) = tx_guard.take() {
131 let _ = sender.send(());
132 }
133 }
134 }) as Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync>>
135 };
136
137 let _handle = tokio::spawn(async move {
139 let subscription_task =
140 subscriber_cache.subscribe(Box::new(sub_clbk), SubscriptionChannel::BajkomatApi);
141 let _ = subscription_task.await;
142 });
143
144 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
146
147 publisher_cache.publish(SubscriptionChannel::BajkomatApi, get_test_msg());
149
150 let timeout_result = tokio::time::timeout(tokio::time::Duration::from_secs(10), rx).await;
152 match timeout_result {
153 Ok(Ok(())) => {
154 }
156 Ok(Err(e)) => {
157 panic!("Callback channel error: {:?}", e);
158 }
159 Err(_error) => {
160 println!("Test timed out - Redis may not be available, skipping test");
162 return;
163 }
164 }
165 }
166}