code0_flow/flow_store/
connection.rs

1use redis::Client;
2use redis::aio::MultiplexedConnection;
3use std::sync::Arc;
4use tokio::sync::Mutex;
5
6pub type FlowStore = Arc<Mutex<Box<MultiplexedConnection>>>;
7
8pub fn build_connection(redis_url: String) -> Client {
9    match Client::open(redis_url) {
10        Ok(client) => {
11            log::info!("Successfully created connection to the FlowStore (Redis)");
12            client
13        }
14        Err(con_error) => {
15            log::error!(
16                "Cannot create FlowStoreClient (Redis) connection! Reason: {:?}",
17                con_error
18            );
19            panic!("Failed to create Redis client")
20        }
21    }
22}
23
24pub async fn create_flow_store_connection(url: String) -> FlowStore {
25    let client = match build_connection(url)
26        .get_multiplexed_async_connection()
27        .await
28    {
29        Ok(connection) => {
30            log::info!("Successfully created connection to the FlowStore (Redis)");
31            connection
32        }
33        Err(con_error) => {
34            log::error!(
35                "Cannot create FlowStoreClient (Redis) connection! Reason: {:?}",
36                con_error
37            );
38            panic!("Failed to create Redis client")
39        }
40    };
41
42    Arc::new(Mutex::new(Box::new(client)))
43}
44
45#[cfg(test)]
46mod tests {
47    use crate::flow_store::connection::create_flow_store_connection;
48    use redis::{AsyncCommands, RedisResult};
49    use serial_test::serial;
50    use testcontainers::GenericImage;
51    use testcontainers::core::IntoContainerPort;
52    use testcontainers::core::WaitFor;
53    use testcontainers::runners::AsyncRunner;
54
55    macro_rules! redis_container_test {
56        ($test_name:ident, $consumer:expr) => {
57            #[tokio::test]
58            #[serial]
59            async fn $test_name() {
60                let port: u16 = 6379;
61                let image_name = "redis";
62                let wait_message = "Ready to accept connections";
63
64                let container = GenericImage::new(image_name, "latest")
65                    .with_exposed_port(port.tcp())
66                    .with_wait_for(WaitFor::message_on_stdout(wait_message))
67                    .start()
68                    .await
69                    .unwrap();
70
71                let host = container.get_host().await.unwrap();
72                let host_port = container.get_host_port_ipv4(port).await.unwrap();
73                let url = format!("redis://{host}:{host_port}");
74
75                $consumer(url).await;
76
77                let _ = container.stop().await;
78            }
79        };
80    }
81
82    redis_container_test!(
83        test_redis_startup,
84        (|url: String| async move {
85            println!("Redis server started correctly on: {}", url);
86        })
87    );
88
89    redis_container_test!(
90        test_redis_ping,
91        (|url: String| async move {
92            println!("Redis server started correctly on: {}", url.clone());
93
94            let flow_store = create_flow_store_connection(url.clone()).await;
95            let mut con = flow_store.lock().await;
96
97            let ping_res: RedisResult<String> = con.ping().await;
98            assert!(ping_res.is_ok());
99            assert_eq!(ping_res.unwrap(), "PONG");
100        })
101    );
102}