code0_flow/flow_store/
connection.rs

1use redis::aio::MultiplexedConnection;
2use redis::Client;
3use std::sync::Arc;
4use tokio::sync::Mutex;
5
6pub type FlowStore = Arc<Mutex<Box<MultiplexedConnection>>>;
7
8pub fn build_connection(redis_url: String) -> Client {
9    match Client::open(redis_url) {
10        Ok(client) => client,
11        Err(con_error) => panic!(
12            "Cannot create Client (Redis) connection! Reason: {}",
13            con_error
14        ),
15    }
16}
17
18pub async fn create_flow_store_connection(url: String) -> FlowStore {
19    let client = match build_connection(url)
20        .get_multiplexed_async_connection()
21        .await
22    {
23        Ok(connection) => connection,
24        Err(error) => panic!(
25            "Cannot create FlowStore (Redis) connection! Reason: {}",
26            error
27        ),
28    };
29
30    Arc::new(Mutex::new(Box::new(client)))
31}
32
33#[cfg(test)]
34mod tests {
35    use crate::flow_store::connection::create_flow_store_connection;
36    use redis::{AsyncCommands, RedisResult};
37    use serial_test::serial;
38    use testcontainers::core::IntoContainerPort;
39    use testcontainers::core::WaitFor;
40    use testcontainers::runners::AsyncRunner;
41    use testcontainers::GenericImage;
42
43    macro_rules! redis_container_test {
44        ($test_name:ident, $consumer:expr) => {
45            #[tokio::test]
46            #[serial]
47            async fn $test_name() {
48                let port: u16 = 6379;
49                let image_name = "redis";
50                let wait_message = "Ready to accept connections";
51
52                let container = GenericImage::new(image_name, "latest")
53                    .with_exposed_port(port.tcp())
54                    .with_wait_for(WaitFor::message_on_stdout(wait_message))
55                    .start()
56                    .await
57                    .unwrap();
58
59                let host = container.get_host().await.unwrap();
60                let host_port = container.get_host_port_ipv4(port).await.unwrap();
61                let url = format!("redis://{host}:{host_port}");
62
63                $consumer(url).await;
64
65                let _ = container.stop().await;
66            }
67        };
68    }
69
70    redis_container_test!(
71        test_redis_startup,
72        (|url: String| async move {
73            println!("Redis server started correctly on: {}", url);
74        })
75    );
76
77    redis_container_test!(
78        test_redis_ping,
79        (|url: String| async move {
80            println!("Redis server started correctly on: {}", url.clone());
81
82            let flow_store = create_flow_store_connection(url.clone()).await;
83            let mut con = flow_store.lock().await;
84
85            let ping_res: RedisResult<String> = con.ping().await;
86            assert!(ping_res.is_ok());
87            assert_eq!(ping_res.unwrap(), "PONG");
88        })
89    );
90}