redis_module_test_rs/
utils.rs

1use anyhow::{Context, Result};
2use std::sync::atomic::AtomicU16;
3use redis::Connection;
4use std::fs;
5use std::path::PathBuf;
6use std::process::Command;
7use std::time::Duration;
8
9pub struct TestConnection {
10    _guards: Vec<ChildGuard>,
11    connection: Connection,
12}
13
14static mut TEST_PORT: AtomicU16 = AtomicU16::new(6479);
15
16impl TestConnection {
17    pub fn new(module_name: &str) -> Self {
18        unsafe {
19            let port = TEST_PORT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
20
21            Self {
22                _guards: start_redis(module_name, port).expect("Redis instance started."),
23                connection: get_redis_connection(port).expect("Established connection to server."),
24            }
25        }
26    }
27}
28
29impl std::ops::Deref for TestConnection {
30    type Target = Connection;
31
32    fn deref(&self) -> &Self::Target {
33        &self.connection
34    }
35}
36
37impl std::ops::DerefMut for TestConnection {
38    fn deref_mut(&mut self) -> &mut Self::Target {
39        &mut self.connection
40    }
41}
42
43
44/// Ensure child process is killed both on normal exit and when panicking due to a failed test.
45pub struct ChildGuard {
46    name: &'static str,
47    child: std::process::Child,
48}
49
50impl Drop for ChildGuard {
51    fn drop(&mut self) {
52        if let Err(e) = self.child.kill() {
53            println!("Could not kill {}: {e}", self.name);
54        }
55        if let Err(e) = self.child.wait() {
56            println!("Could not wait for {}: {e}", self.name);
57        }
58    }
59}
60
61fn start_redis(module_name: &str, port: u16) -> Result<Vec<ChildGuard>, &'static str> {
62    Ok(vec![start_redis_server_with_module(module_name, port)
63        .map_err(|_| "failed to start redis server")?])
64}
65
66pub fn start_redis_server_with_module(module_name: &str, port: u16) -> Result<ChildGuard> {
67    let extension = if cfg!(target_os = "macos") {
68        "dylib"
69    } else {
70        "so"
71    };
72
73    let profile = if cfg!(not(debug_assertions)) {
74        "release"
75    } else {
76        "debug"
77    };
78
79    let module_path: PathBuf = [
80        std::env::current_dir()?,
81        PathBuf::from(format!(
82            "target/{profile}/lib{module_name}.{extension}"
83        )),
84    ]
85    .iter()
86    .collect();
87
88    assert!(fs::metadata(&module_path)
89        .with_context(|| format!("Loading redis module: {}", module_path.display()))?
90        .is_file());
91
92    let module_path = format!("{}", module_path.display());
93
94    let args = &[
95        "--port",
96        &port.to_string(),
97        "--loadmodule",
98        module_path.as_str(),
99        "--enable-debug-command",
100        "yes",
101    ];
102
103    let redis_server = Command::new("redis-server")
104        .args(args)
105        .spawn()
106        .map(|c| ChildGuard {
107            name: "redis-server",
108            child: c,
109        })?;
110
111    Ok(redis_server)
112}
113
114// Get connection to Redis
115pub fn get_redis_connection(port: u16) -> Result<Connection> {
116    let client = redis::Client::open(format!("redis://127.0.0.1:{port}/"))?;
117    loop {
118        let res = client.get_connection();
119        match res {
120            Ok(con) => return Ok(con),
121            Err(e) => {
122                if e.is_connection_refusal() {
123                    // Redis not ready yet, sleep and retry
124                    std::thread::sleep(Duration::from_millis(50));
125                } else {
126                    return Err(e.into());
127                }
128            }
129        }
130    }
131}