code0_flow/flow_store/
connection.rs1use redis::aio::MultiplexedConnection;
2use redis::Client;
3use std::sync::Arc;
4use tokio::sync::Mutex;
5
6pub type FlowStore = Arc<Mutex<Box<MultiplexedConnection>>>;
7
8pub fn build_connection(redis_url: String) -> Client {
9 match Client::open(redis_url) {
10 Ok(client) => client,
11 Err(con_error) => panic!(
12 "Cannot create Client (Redis) connection! Reason: {}",
13 con_error
14 ),
15 }
16}
17
18pub async fn create_flow_store_connection(url: String) -> FlowStore {
19 let client = match build_connection(url)
20 .get_multiplexed_async_connection()
21 .await
22 {
23 Ok(connection) => connection,
24 Err(error) => panic!(
25 "Cannot create FlowStore (Redis) connection! Reason: {}",
26 error
27 ),
28 };
29
30 Arc::new(Mutex::new(Box::new(client)))
31}
32
33#[cfg(test)]
34mod tests {
35 use crate::flow_store::connection::create_flow_store_connection;
36 use redis::{AsyncCommands, RedisResult};
37 use serial_test::serial;
38 use testcontainers::core::IntoContainerPort;
39 use testcontainers::core::WaitFor;
40 use testcontainers::runners::AsyncRunner;
41 use testcontainers::GenericImage;
42
43 macro_rules! redis_container_test {
44 ($test_name:ident, $consumer:expr) => {
45 #[tokio::test]
46 #[serial]
47 async fn $test_name() {
48 let port: u16 = 6379;
49 let image_name = "redis";
50 let wait_message = "Ready to accept connections";
51
52 let container = GenericImage::new(image_name, "latest")
53 .with_exposed_port(port.tcp())
54 .with_wait_for(WaitFor::message_on_stdout(wait_message))
55 .start()
56 .await
57 .unwrap();
58
59 let host = container.get_host().await.unwrap();
60 let host_port = container.get_host_port_ipv4(port).await.unwrap();
61 let url = format!("redis://{host}:{host_port}");
62
63 $consumer(url).await;
64
65 let _ = container.stop().await;
66 }
67 };
68 }
69
70 redis_container_test!(
71 test_redis_startup,
72 (|url: String| async move {
73 println!("Redis server started correctly on: {}", url);
74 })
75 );
76
77 redis_container_test!(
78 test_redis_ping,
79 (|url: String| async move {
80 println!("Redis server started correctly on: {}", url.clone());
81
82 let flow_store = create_flow_store_connection(url.clone()).await;
83 let mut con = flow_store.lock().await;
84
85 let ping_res: RedisResult<String> = con.ping().await;
86 assert!(ping_res.is_ok());
87 assert_eq!(ping_res.unwrap(), "PONG");
88 })
89 );
90}