flippico_cache/
lib.rs

1use crate::types::provider::{Provider, ProviderEither};
2
3pub mod core;
4pub mod providers;
5pub mod types;
6
7pub struct Cache {
8    provider: ProviderEither,
9}
10
11impl Cache {
12    pub fn new() -> Self {
13        Self {
14            provider: Provider::Redis.create_provider(),
15        }
16    }
17
18    pub fn pub_sub(
19        self,
20    ) -> Result<
21        Box<
22            dyn types::provider::ConnectablePubSubProvider<
23                    Channels = types::channels::SubscriptionChannel,
24                > + Send
25                + Sync,
26        >,
27        &'static str,
28    > {
29        self.provider.pub_sub()
30    }
31}
32
33#[cfg(test)]
34mod tests {
35    use crate::types::channels::{ChannelMessage, SubscriptionChannel};
36
37    use super::*;
38
39    fn get_test_msg() -> ChannelMessage {
40        ChannelMessage {
41            channel: SubscriptionChannel::BajkomatApi,
42            body: None,
43        }
44    }
45
46    #[test]
47    fn publish_test() {
48        let mut cache = Cache::new().pub_sub().unwrap();
49        cache.publish(SubscriptionChannel::BajkomatApi, get_test_msg());
50    }
51
52    #[tokio::test]
53    async fn subscribe_test() {
54        let subscriber_cache = Cache::new().pub_sub().unwrap();
55        let mut publisher_cache = Cache::new().pub_sub().unwrap();
56
57        // Use a channel to signal when the callback is executed
58        let (tx, rx) = tokio::sync::oneshot::channel();
59        let tx = std::sync::Arc::new(std::sync::Mutex::new(Some(tx)));
60
61        let sub_clbk = move |msg: ChannelMessage| {
62            let msg2 = get_test_msg();
63            assert_eq!(msg, msg2);
64            if let Ok(mut tx_guard) = tx.lock() {
65                if let Some(sender) = tx_guard.take() {
66                    let _ = sender.send(());
67                }
68            }
69        };
70
71        // Start the subscription task by moving the cache into the spawned task
72        let _handle = tokio::spawn(async move {
73            let subscription_task =
74                subscriber_cache.subscribe(Box::new(sub_clbk), SubscriptionChannel::BajkomatApi);
75            let _ = subscription_task.await;
76        });
77
78        // Wait a moment for subscription to be established
79        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
80
81        // Publish the message
82        publisher_cache.publish(SubscriptionChannel::BajkomatApi, get_test_msg());
83
84        // Wait for the callback to be executed with timeout
85        let timeout_result = tokio::time::timeout(tokio::time::Duration::from_secs(10), rx).await;
86        match timeout_result {
87            Ok(Ok(())) => {
88                // Test passed
89            }
90            Ok(Err(e)) => {
91                panic!("Callback channel error: {:?}", e);
92            }
93            Err(error) => {
94                // Check if Redis is available - if not, skip the test
95                println!("Test timed out - Redis may not be available, skipping test");
96                return;
97            }
98        }
99    }
100}