code0_flow/flow_store/
connection.rs1use redis::Client;
2use redis::aio::MultiplexedConnection;
3use std::sync::Arc;
4use tokio::sync::Mutex;
5
6pub type FlowStore = Arc<Mutex<Box<MultiplexedConnection>>>;
7
8pub fn build_connection(redis_url: String) -> Client {
9 match Client::open(redis_url) {
10 Ok(client) => {
11 log::info!("Successfully created connection to the FlowStore (Redis)");
12 client
13 }
14 Err(con_error) => {
15 log::error!(
16 "Cannot create FlowStoreClient (Redis) connection! Reason: {:?}",
17 con_error
18 );
19 panic!("Failed to create Redis client")
20 }
21 }
22}
23
24pub async fn create_flow_store_connection(url: String) -> FlowStore {
25 let client = match build_connection(url)
26 .get_multiplexed_async_connection()
27 .await
28 {
29 Ok(connection) => {
30 log::info!("Successfully created connection to the FlowStore (Redis)");
31 connection
32 }
33 Err(con_error) => {
34 log::error!(
35 "Cannot create FlowStoreClient (Redis) connection! Reason: {:?}",
36 con_error
37 );
38 panic!("Failed to create Redis client")
39 }
40 };
41
42 Arc::new(Mutex::new(Box::new(client)))
43}
44
45#[cfg(test)]
46mod tests {
47 use crate::flow_store::connection::create_flow_store_connection;
48 use redis::{AsyncCommands, RedisResult};
49 use serial_test::serial;
50 use testcontainers::GenericImage;
51 use testcontainers::core::IntoContainerPort;
52 use testcontainers::core::WaitFor;
53 use testcontainers::runners::AsyncRunner;
54
55 macro_rules! redis_container_test {
56 ($test_name:ident, $consumer:expr) => {
57 #[tokio::test]
58 #[serial]
59 async fn $test_name() {
60 let port: u16 = 6379;
61 let image_name = "redis";
62 let wait_message = "Ready to accept connections";
63
64 let container = GenericImage::new(image_name, "latest")
65 .with_exposed_port(port.tcp())
66 .with_wait_for(WaitFor::message_on_stdout(wait_message))
67 .start()
68 .await
69 .unwrap();
70
71 let host = container.get_host().await.unwrap();
72 let host_port = container.get_host_port_ipv4(port).await.unwrap();
73 let url = format!("redis://{host}:{host_port}");
74
75 $consumer(url).await;
76
77 let _ = container.stop().await;
78 }
79 };
80 }
81
82 redis_container_test!(
83 test_redis_startup,
84 (|url: String| async move {
85 println!("Redis server started correctly on: {}", url);
86 })
87 );
88
89 redis_container_test!(
90 test_redis_ping,
91 (|url: String| async move {
92 println!("Redis server started correctly on: {}", url.clone());
93
94 let flow_store = create_flow_store_connection(url.clone()).await;
95 let mut con = flow_store.lock().await;
96
97 let ping_res: RedisResult<String> = con.ping().await;
98 assert!(ping_res.is_ok());
99 assert_eq!(ping_res.unwrap(), "PONG");
100 })
101 );
102}