use std::thread;
use std::time::Duration;
use crate::utils::get_module_path;
use anyhow::Context;
use anyhow::Result;
use redis::Commands;
use redis::Value;
use redis::{RedisError, RedisResult};
use utils::{
check_auth, check_blocked_clients, get_valkey_connection, setup_acl_users,
start_valkey_server_with_module, AuthExpectedResult,
};
const FAILED_TO_START_SERVER: &str = "failed to start valkey server";
const FAILED_TO_CONNECT_TO_SERVER: &str = "failed to connect to valkey server";
mod utils;
#[test]
fn test_hello() -> Result<()> {
let port: u16 = 6479;
let _guards =
vec![start_valkey_server_with_module("hello", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let res: Vec<i32> = redis::cmd("hello.mul")
.arg(&[3, 4])
.query(&mut con)
.with_context(|| "failed to run hello.mul")?;
assert_eq!(res, vec![3, 4, 12]);
let res: Result<Vec<i32>, RedisError> =
redis::cmd("hello.mul").arg(&["3", "xx"]).query(&mut con);
if res.is_ok() {
return Err(anyhow::Error::msg("Should return an error"));
}
Ok(())
}
#[test]
fn test_keys_pos() -> Result<()> {
let port: u16 = 6480;
let _guards = vec![start_valkey_server_with_module("keys_pos", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let res: Vec<String> = redis::cmd("keys_pos")
.arg(&["a", "1", "b", "2"])
.query(&mut con)
.with_context(|| "failed to run keys_pos")?;
assert_eq!(res, vec!["a", "b"]);
let res: Result<Vec<String>, RedisError> =
redis::cmd("keys_pos").arg(&["a", "1", "b"]).query(&mut con);
if res.is_ok() {
return Err(anyhow::Error::msg("Should return an error"));
}
Ok(())
}
#[test]
fn test_helper_version() -> Result<()> {
let port: u16 = 6481;
let _guards = vec![start_valkey_server_with_module("test_helper", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let res: Vec<i64> = redis::cmd("test_helper.version")
.query(&mut con)
.with_context(|| "failed to run test_helper.version")?;
assert!(res[0] > 0);
let res3: String = redis::cmd("test_helper.name")
.query(&mut con)
.with_context(|| "failed to run test_helper.name")?;
assert_eq!(res3, "test_helper.name");
Ok(())
}
#[test]
fn test_command_name() -> Result<()> {
use valkey_module::ValkeyValue;
let port: u16 = 6482;
let _guards = vec![start_valkey_server_with_module("test_helper", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let res: Result<String, RedisError> = redis::cmd("test_helper.name").query(&mut con);
let info: String = redis::cmd("info")
.arg(&["server"])
.query(&mut con)
.with_context(|| "failed to run test_helper.name")?;
if let Ok(ver) = valkey_module::Context::version_from_info(ValkeyValue::SimpleString(info)) {
if ver.major > 6
|| (ver.major == 6 && ver.minor > 2)
|| (ver.major == 6 && ver.minor == 2 && ver.patch >= 5)
{
assert_eq!(res.unwrap(), String::from("test_helper.name"));
} else {
assert!(res
.err()
.unwrap()
.to_string()
.contains("RedisModule_GetCurrentCommandName is not available"));
}
}
Ok(())
}
#[test]
fn test_helper_info() -> Result<()> {
const MODULES: [(&str, bool); 4] = [
("test_helper", false),
("info_handler_macro", false),
("info_handler_builder", true),
("info_handler_struct", true),
];
MODULES
.into_iter()
.try_for_each(|(module, has_dictionary)| {
let port: u16 = 6483;
let _guards = vec![start_valkey_server_with_module(module, port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con =
get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let res: String = redis::cmd("INFO")
.arg(module)
.query(&mut con)
.with_context(|| format!("failed to run INFO {module}"))?;
assert!(res.contains(&format!("{module}_field:value")));
if has_dictionary {
assert!(res.contains("dictionary:key=value"));
}
Ok(())
})
}
#[test]
fn test_info_handler_multiple_sections() -> Result<()> {
const MODULES: [&str; 1] = ["info_handler_multiple_sections"];
MODULES.into_iter().try_for_each(|module| {
let port: u16 = 6500;
let _guards = vec![start_valkey_server_with_module(module, port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let res: String = redis::cmd("INFO")
.arg(format!("{module}_InfoSection2"))
.query(&mut con)
.with_context(|| format!("failed to run INFO {module}"))?;
assert!(res.contains(&format!("{module}_field_2:value2")));
assert!(!res.contains(&format!("{module}_field_1:value1")));
Ok(())
})
}
#[allow(unused_must_use)]
#[test]
fn test_test_helper_err() -> Result<()> {
let port: u16 = 6484;
let _guards =
vec![start_valkey_server_with_module("hello", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
redis::cmd("test_helper.err")
.arg(&["\x00\x00"])
.query::<()>(&mut con);
redis::cmd("test_helper.err")
.arg(&["no crash\x00"])
.query::<()>(&mut con);
Ok(())
}
#[test]
fn test_string() -> Result<()> {
let port: u16 = 6485;
let _guards =
vec![start_valkey_server_with_module("string", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
redis::cmd("string.set")
.arg(&["key", "value"])
.exec(&mut con)
.with_context(|| "failed to run string.set")?;
let res: String = redis::cmd("string.get").arg(&["key"]).query(&mut con)?;
assert_eq!(&res, "value");
Ok(())
}
#[test]
fn test_scan() -> Result<()> {
let port: u16 = 6486;
let _guards = vec![start_valkey_server_with_module("scan_keys", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
redis::cmd("set")
.arg(&["x", "1"])
.exec(&mut con)
.with_context(|| "failed to run string.set")?;
redis::cmd("set")
.arg(&["y", "1"])
.exec(&mut con)
.with_context(|| "failed to run string.set")?;
let mut res: Vec<String> = redis::cmd("scan_keys").query(&mut con)?;
res.sort();
assert_eq!(&res, &["x", "y"]);
Ok(())
}
#[test]
fn test_stream_reader() -> Result<()> {
let port: u16 = 6487;
let _guards =
vec![start_valkey_server_with_module("stream", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let _: String = redis::cmd("XADD")
.arg(&["s", "1-1", "foo", "bar"])
.query(&mut con)
.with_context(|| "failed to add data to the stream")?;
let _: String = redis::cmd("XADD")
.arg(&["s", "1-2", "foo", "bar"])
.query(&mut con)
.with_context(|| "failed to add data to the stream")?;
let res: String = redis::cmd("STREAM_POP")
.arg(&["s"])
.query(&mut con)
.with_context(|| "failed to run keys_pos")?;
assert_eq!(res, "1-1");
let res: String = redis::cmd("STREAM_POP")
.arg(&["s"])
.query(&mut con)
.with_context(|| "failed to run keys_pos")?;
assert_eq!(res, "1-2");
let res: usize = redis::cmd("XLEN")
.arg(&["s"])
.query(&mut con)
.with_context(|| "failed to add data to the stream")?;
assert_eq!(res, 0);
Ok(())
}
#[test]
fn test_call() -> Result<()> {
let port: u16 = 6488;
let _guards =
vec![start_valkey_server_with_module("call", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let res: String = redis::cmd("call.test")
.query(&mut con)
.with_context(|| "failed to run string.set")?;
assert_eq!(&res, "pass");
Ok(())
}
#[test]
fn test_ctx_flags() -> Result<()> {
let port: u16 = 6489;
let _guards = vec![start_valkey_server_with_module("ctx_flags", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let res: String = redis::cmd("my_role").query(&mut con)?;
assert_eq!(&res, "master");
Ok(())
}
#[test]
fn test_get_current_user() -> Result<()> {
let port: u16 = 6490;
let _guards =
vec![start_valkey_server_with_module("acl", port).with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let res: String = redis::cmd("get_current_user").query(&mut con)?;
assert_eq!(&res, "default");
Ok(())
}
#[test]
fn test_verify_acl_on_user() -> Result<()> {
let port: u16 = 6491;
let _guards =
vec![start_valkey_server_with_module("acl", port).with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let res: String = redis::cmd("verify_key_access_for_user")
.arg(&["default", "x"])
.query(&mut con)?;
assert_eq!(&res, "OK");
let res: String = redis::cmd("ACL")
.arg(&["SETUSER", "alice", "on", ">pass", "~cached:*", "+get"])
.query(&mut con)?;
assert_eq!(&res, "OK");
let res: String = redis::cmd("verify_key_access_for_user")
.arg(&["alice", "cached:1"])
.query(&mut con)?;
assert_eq!(&res, "OK");
let res: RedisResult<String> = redis::cmd("verify_key_access_for_user")
.arg(&["alice", "not_allow"])
.query(&mut con);
assert!(res.is_err());
if let Err(res) = res {
assert_eq!(
res.to_string(),
"Err: User does not have permissions on key"
);
}
Ok(())
}
#[test]
fn test_key_space_notifications() -> Result<()> {
let port: u16 = 6492;
let _guards =
vec![start_valkey_server_with_module("events", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let res: usize = redis::cmd("events.num_key_miss").query(&mut con)?;
assert_eq!(res, 0);
let _ = redis::cmd("GET").arg(&["x"]).exec(&mut con)?;
let res: usize = redis::cmd("events.num_key_miss").query(&mut con)?;
assert_eq!(res, 1);
let _: String = redis::cmd("SET").arg(&["x", "1"]).query(&mut con)?;
let res: String = redis::cmd("GET").arg(&["num_sets"]).query(&mut con)?;
assert_eq!(res, "1");
Ok(())
}
#[test]
fn test_context_mutex() -> Result<()> {
let port: u16 = 6493;
let _guards =
vec![start_valkey_server_with_module("threads", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let res: String = redis::cmd("set_static_data")
.arg(&["foo"])
.query(&mut con)?;
assert_eq!(&res, "OK");
let res: String = redis::cmd("get_static_data").query(&mut con)?;
assert_eq!(&res, "foo");
let res: String = redis::cmd("get_static_data_on_thread").query(&mut con)?;
assert_eq!(&res, "foo");
Ok(())
}
#[test]
fn test_server_event() -> Result<()> {
let port: u16 = 6494;
let _guards = vec![start_valkey_server_with_module("server_events", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let before_sleep_count: i64 = redis::cmd("num_event_loop_before_sleep").query(&mut con)?;
let after_sleep_count: i64 = redis::cmd("num_event_loop_after_sleep").query(&mut con)?;
thread::sleep(Duration::from_millis(50));
let before_sleep_count_after_wait: i64 =
redis::cmd("num_event_loop_before_sleep").query(&mut con)?;
let after_sleep_count_after_wait: i64 =
redis::cmd("num_event_loop_after_sleep").query(&mut con)?;
assert!(before_sleep_count_after_wait > before_sleep_count);
assert!(after_sleep_count_after_wait > after_sleep_count);
redis::cmd("flushall")
.exec(&mut con)
.with_context(|| "failed to run flushall")?;
let res: i64 = redis::cmd("num_flushed").query(&mut con)?;
assert_eq!(res, 1);
redis::cmd("flushall")
.exec(&mut con)
.with_context(|| "failed to run flushall")?;
let res: i64 = redis::cmd("num_flushed").query(&mut con)?;
assert_eq!(res, 2);
redis::cmd("config")
.arg(&["set", "maxmemory", "1"])
.exec(&mut con)
.with_context(|| "failed to run config set maxmemory")?;
let res: i64 = redis::cmd("num_max_memory_changes").query(&mut con)?;
assert_eq!(res, 1);
redis::cmd("config")
.arg(&["set", "maxmemory", "0"])
.exec(&mut con)
.with_context(|| "failed to run config set maxmemory")?;
let res: i64 = redis::cmd("num_max_memory_changes").query(&mut con)?;
assert_eq!(res, 2);
let res: i64 = redis::cmd("num_crons").query(&mut con)?;
assert!(res > 0);
redis::cmd("set")
.arg(&["key", "value"])
.exec(&mut con)
.with_context(|| "failed to do set")?;
redis::cmd("set")
.arg(&["key", "new_value"])
.exec(&mut con)
.with_context(|| "failed to do set")?;
redis::cmd("del")
.arg(&["key"])
.exec(&mut con)
.with_context(|| "failed to do del")?;
let res: i64 = redis::cmd("num_key_events").query(&mut con)?;
assert_eq!(res, 2);
redis::cmd("bgsave")
.exec(&mut con)
.with_context(|| "failed to run bgsave")?;
std::thread::sleep(std::time::Duration::from_millis(100));
let persistence_events_after_rdb: i64 = redis::cmd("num_persistence_events").query(&mut con)?;
assert_eq!(persistence_events_after_rdb, 4);
Ok(())
}
#[test]
fn test_server_event_shutdown() -> Result<()> {
let port: u16 = 6512;
let _guards = vec![start_valkey_server_with_module("server_events", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let shutdown_log_path = "shutdown_log.txt";
if std::path::Path::new(shutdown_log_path).exists() {
std::fs::remove_file(shutdown_log_path)
.with_context(|| "failed to remove shutdown log file")?;
}
let _ = redis::cmd("SHUTDOWN")
.arg("NOSAVE")
.query::<String>(&mut con);
std::thread::sleep(std::time::Duration::from_secs(1));
let contents = std::fs::read_to_string(shutdown_log_path)
.with_context(|| "failed to read shutdown log file")?;
assert!(contents.contains("Server shutdown callback event ..."));
std::fs::remove_file(shutdown_log_path)
.with_context(|| "failed to remove shutdown log file")?;
Ok(())
}
#[test]
fn test_client_change_event() -> Result<()> {
let port: u16 = 6511;
let _guards = vec![start_valkey_server_with_module("server_events", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let con2: redis::Connection =
get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let conn_res: i64 = redis::cmd("num_connects").query(&mut con)?;
println!("Connection result: {}", conn_res);
assert_eq!(conn_res, 2);
drop(con2);
let disconn_res: i64 = redis::cmd("num_connects").query(&mut con)?;
println!("Disconnection result: {}", disconn_res);
assert_eq!(disconn_res, 1);
Ok(())
}
#[test]
fn test_configuration() -> Result<()> {
let port: u16 = 6495;
let _guards = vec![start_valkey_server_with_module("configuration", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let config_get = |config: &str| -> Result<String> {
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let res: Vec<String> = redis::cmd("config")
.arg(&["get", config])
.query(&mut con)
.with_context(|| "failed to run config get")?;
Ok(res[1].clone())
};
let config_set = |config: &str, val: &str| -> Result<()> {
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let res: String = redis::cmd("config")
.arg(&["set", config, val])
.query(&mut con)
.map_err(|e| anyhow::anyhow!("Failed to run config set: {}", e))?;
assert_eq!(res, "OK");
Ok(())
};
assert_eq!(config_get("configuration.i64")?, "10");
config_set("configuration.i64", "100")?;
assert_eq!(config_get("configuration.i64")?, "100");
assert_eq!(config_get("configuration.atomic_i64")?, "10");
config_set("configuration.atomic_i64", "100")?;
assert_eq!(config_get("configuration.atomic_i64")?, "100");
assert_eq!(config_get("configuration.valkey_string")?, "default");
config_set("configuration.valkey_string", "new")?;
assert_eq!(config_get("configuration.valkey_string")?, "new");
assert_eq!(config_get("configuration.string")?, "default");
config_set("configuration.string", "new")?;
assert_eq!(config_get("configuration.string")?, "new");
assert_eq!(config_get("configuration.mutex_string")?, "default");
config_set("configuration.mutex_string", "new")?;
assert_eq!(config_get("configuration.mutex_string")?, "new");
assert_eq!(config_get("configuration.atomic_bool")?, "yes");
config_set("configuration.atomic_bool", "no")?;
assert_eq!(config_get("configuration.atomic_bool")?, "no");
assert_eq!(config_get("configuration.bool")?, "yes");
config_set("configuration.bool", "no")?;
assert_eq!(config_get("configuration.bool")?, "no");
assert_eq!(config_get("configuration.enum")?, "Val1");
config_set("configuration.enum", "Val2")?;
assert_eq!(config_get("configuration.enum")?, "Val2");
assert_eq!(config_get("configuration.enum_mutex")?, "Val1");
config_set("configuration.enum_mutex", "Val2")?;
assert_eq!(config_get("configuration.enum_mutex")?, "Val2");
let value = config_set("configuration.reject_valkey_string", "rejectvalue");
assert!(value
.unwrap_err()
.to_string()
.contains("Rejected from custom string validation"));
let value = config_set("configuration.reject_i64", "123");
assert!(value
.unwrap_err()
.to_string()
.contains("Rejected from custom i64 validation"));
let value = config_set("configuration.reject_bool", "no");
assert!(value
.unwrap_err()
.to_string()
.contains("Rejected from custom bool validation"));
let value = config_set("configuration.reject_enum", "Val2");
assert!(value
.unwrap_err()
.to_string()
.contains("Rejected from custom enum validation"));
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let res: i64 = redis::cmd("configuration.num_changes")
.query(&mut con)
.with_context(|| "failed to run configuration.num_changes")?;
assert_eq!(res, 26);
assert_eq!(config_get("configuration.reject_valkey_string")?, "default");
config_set("configuration.reject_valkey_string", "validvalue")?;
assert_eq!(
config_get("configuration.reject_valkey_string")?,
"validvalue"
);
assert_eq!(config_get("configuration.reject_i64")?, "10");
config_set("configuration.reject_i64", "11")?;
assert_eq!(config_get("configuration.reject_i64")?, "11");
assert_eq!(config_get("configuration.reject_bool")?, "yes");
config_set("configuration.reject_bool", "yes")?;
assert_eq!(config_get("configuration.reject_bool")?, "yes");
assert_eq!(config_get("configuration.reject_enum")?, "Val1");
config_set("configuration.reject_enum", "Val1")?;
assert_eq!(config_get("configuration.reject_enum")?, "Val1");
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let res: i64 = redis::cmd("configuration.num_changes")
.query(&mut con)
.with_context(|| "failed to run configuration.num_changes")?;
assert_eq!(res, 28);
Ok(())
}
#[test]
fn test_response() -> Result<()> {
let port: u16 = 6496;
let _guards = vec![start_valkey_server_with_module("response", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
redis::cmd("hset")
.arg(&["k", "a", "b", "c", "d", "e", "b", "f", "g"])
.exec(&mut con)
.with_context(|| "failed to run hset")?;
let mut res: Vec<String> = redis::cmd("map.mget")
.arg(&["k", "a", "c", "e"])
.query(&mut con)
.with_context(|| "failed to run map.mget")?;
res.sort();
assert_eq!(&res, &["a", "b", "b", "c", "d", "e"]);
let mut res: Vec<String> = redis::cmd("map.unique")
.arg(&["k", "a", "c", "e"])
.query(&mut con)
.with_context(|| "failed to run map.unique")?;
res.sort();
assert_eq!(&res, &["b", "d"]);
Ok(())
}
#[test]
fn test_command_proc_macro() -> Result<()> {
let port: u16 = 6497;
let _guards = vec![start_valkey_server_with_module("proc_macro_commands", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let res: Vec<String> = redis::cmd("COMMAND")
.arg(&["GETKEYS", "classic_keys", "x", "foo", "y", "bar"])
.query(&mut con)
.with_context(|| "failed to run command getkeys")?;
assert_eq!(&res, &["x", "y"]);
let res: Vec<String> = redis::cmd("COMMAND")
.arg(&["GETKEYS", "keyword_keys", "foo", "x", "1", "y", "2"])
.query(&mut con)
.with_context(|| "failed to run command getkeys")?;
assert_eq!(&res, &["x", "y"]);
let res: Vec<String> = redis::cmd("COMMAND")
.arg(&["GETKEYS", "num_keys", "3", "x", "y", "z", "foo", "bar"])
.query(&mut con)
.with_context(|| "failed to run command getkeys")?;
assert_eq!(&res, &["x", "y", "z"]);
let res: Vec<String> = redis::cmd("COMMAND")
.arg(&["GETKEYS", "num_keys", "0", "foo", "bar"])
.query(&mut con)
.with_context(|| "failed to run command getkeys")?;
assert!(res.is_empty());
Ok(())
}
#[test]
fn test_valkey_value_derive() -> Result<()> {
let port: u16 = 6498;
let _guards = vec![start_valkey_server_with_module("proc_macro_commands", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let res: Value = redis::cmd("valkey_value_derive")
.query(&mut con)
.with_context(|| "failed to run valkey_value_derive")?;
assert_eq!(res.as_sequence().unwrap().len(), 22);
let res: String = redis::cmd("valkey_value_derive")
.arg(&["test"])
.query(&mut con)
.with_context(|| "failed to run valkey_value_derive")?;
assert_eq!(res, "OK");
Ok(())
}
#[test]
fn test_call_blocking() -> Result<()> {
let port: u16 = 6499;
let _guards =
vec![start_valkey_server_with_module("call", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let res: Option<String> = redis::cmd("call.blocking")
.query(&mut con)
.with_context(|| "failed to run call.blocking")?;
assert_eq!(res, None);
let res: Option<String> = redis::cmd("call.blocking_from_detached_ctx")
.query(&mut con)
.with_context(|| "failed to run call.blocking_from_detached_ctx")?;
assert_eq!(res, None);
Ok(())
}
#[test]
fn test_open_key_with_flags() -> Result<()> {
let port: u16 = 6501;
let _guards = vec![start_valkey_server_with_module("open_key_with_flags", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
redis::cmd("DEBUG")
.arg(&["SET-ACTIVE-EXPIRE", "0"])
.exec(&mut con)
.with_context(|| "failed to run DEBUG SET-ACTIVE-EXPIRE")?;
for cmd in ["open_key_with_flags.write", "open_key_with_flags.read"].into_iter() {
redis::cmd("set")
.arg(&["x", "1"])
.exec(&mut con)
.with_context(|| "failed to run set")?;
redis::cmd("pexpire")
.arg(&["x", "1"])
.exec(&mut con)
.with_context(|| "failed to run pexpire")?;
thread::sleep(Duration::from_millis(500));
let res = redis::cmd(cmd).arg(&["x"]).query(&mut con);
assert_eq!(res, Ok(()));
let stats: String = redis::cmd("info").arg(&["stats"]).query(&mut con)?;
let expired_keys = stats
.match_indices("expired_keys:")
.next()
.map(|(i, _)| &stats[i..i + "expired_keys:".len() + 1])
.and_then(|s| s.split(':').nth(1))
.and_then(|s| s.parse::<i32>().ok())
.unwrap_or(-1);
assert_eq!(expired_keys, 0);
redis::cmd("del").arg(&["x"]).exec(&mut con)?;
redis::cmd("config").arg(&["RESETSTAT"]).exec(&mut con)?;
}
Ok(())
}
#[test]
fn test_expire() -> Result<()> {
let port: u16 = 6502;
let _guards =
vec![start_valkey_server_with_module("expire", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
redis::cmd("set")
.arg(&["key", "value"])
.exec(&mut con)
.with_context(|| "failed to run set")?;
let ttl: i64 = redis::cmd("ttl").arg(&["key"]).query(&mut con)?;
assert_eq!(ttl, -1);
redis::cmd("expire.cmd")
.arg(&["key", "100"])
.exec(&mut con)
.with_context(|| "failed to run expire.cmd")?;
let ttl: i64 = redis::cmd("ttl").arg(&["key"]).query(&mut con)?;
assert!(ttl > 0);
redis::cmd("expire.cmd")
.arg(&["key", "-1"])
.exec(&mut con)
.with_context(|| "failed to run expire.cmd")?;
let ttl: i64 = redis::cmd("ttl").arg(&["key"]).query(&mut con)?;
assert_eq!(ttl, -1);
Ok(())
}
#[test]
fn test_alloc() -> Result<()> {
let port: u16 = 6509;
let _guards = vec![start_valkey_server_with_module("data_type", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let res: i64 = redis::cmd("alloc.set")
.arg(&["test_key", "10"])
.query(&mut con)
.with_context(|| "failed to run alloc.set")?;
assert_eq!(res, 10);
let res: String = redis::cmd("alloc.get")
.arg(&["test_key"])
.query(&mut con)
.with_context(|| "failed to run alloc.get")?;
assert_eq!(res, "A".repeat(10));
let res: i64 = redis::cmd("alloc.set")
.arg(&["test_key", "5"])
.query(&mut con)
.with_context(|| "failed to run alloc.set")?;
assert_eq!(res, 5);
let res: String = redis::cmd("alloc.get")
.arg(&["test_key"])
.query(&mut con)
.with_context(|| "failed to run alloc.get")?;
assert_eq!(res, "B".repeat(5));
let _: i64 = redis::cmd("DEL")
.arg(&["test_key"])
.query(&mut con)
.with_context(|| "failed to run DEL")?;
let res: Option<String> = redis::cmd("alloc.get")
.arg(&["test_key"])
.query(&mut con)
.with_context(|| "failed to run alloc.get")?;
assert!(res.is_none());
Ok(())
}
#[test]
fn test_debug() -> Result<()> {
let port: u16 = 6504;
let _guards = vec![start_valkey_server_with_module("data_type", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let _: i64 = redis::cmd("alloc.set")
.arg(&["test_key", "10"])
.query(&mut con)
.with_context(|| "failed to run alloc.set")?;
let res: String = redis::cmd("DEBUG")
.arg("digest")
.query(&mut con)
.with_context(|| "failed to run DEBUG DIGEST")?;
assert!(
!res.is_empty(),
"DEBUG DIGEST should return a non-empty string"
);
let res: redis::Value = redis::cmd("DEBUG")
.arg(&["digest-value", "test_key"])
.query(&mut con)
.with_context(|| "failed to run DEBUG DIGEST-VALUE")?;
assert!(
!matches!(res, redis::Value::Nil),
"DEBUG DIGEST-VALUE should not return nil"
);
let _: i64 = redis::cmd("DEL")
.arg("test_key")
.query(&mut con)
.with_context(|| "failed to run DEL")?;
let res: String = redis::cmd("DEBUG")
.arg("digest")
.query(&mut con)
.with_context(|| "failed to run DEBUG DIGEST")?;
assert_eq!(res, "0".repeat(40));
let port: u16 = 6505;
let _guards = vec![start_valkey_server_with_module("data_type2", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con1 = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let port2: u16 = 6506;
let _guards = vec![start_valkey_server_with_module("data_type2", port2)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con2 = get_valkey_connection(port2).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let _: i64 = redis::cmd("alloc2.set")
.arg(&["k1", "3"])
.query(&mut con1)
.with_context(|| "failed to run alloc2.set")?;
let res: String = redis::cmd("DEBUG")
.arg("digest")
.query(&mut con1)
.with_context(|| "failed to run DEBUG DIGEST")?;
assert!(
!res.is_empty(),
"DEBUG DIGEST should return a non-empty string"
);
let get_res_db1: String = redis::cmd("alloc2.get")
.arg("k1")
.query(&mut con1)
.with_context(|| "failed to run DEBUG DIGEST")?;
assert!(
!get_res_db1.is_empty(),
"alloc.get should return a non-empty string"
);
let _: i64 = redis::cmd("alloc2.set")
.arg(&["k1", "3"])
.query(&mut con2)
.with_context(|| "failed to run alloc2.set")?;
let res: String = redis::cmd("DEBUG")
.arg("digest")
.query(&mut con2)
.with_context(|| "failed to run DEBUG DIGEST")?;
assert!(
!res.is_empty(),
"DEBUG DIGEST should return a non-empty string"
);
let get_res_db2: String = redis::cmd("alloc2.get")
.arg("k1")
.query(&mut con2)
.with_context(|| "failed to run DEBUG DIGEST")?;
assert!(
!get_res_db2.is_empty(),
"DEBUG DIGEST should return a non-empty string"
);
assert_eq!(get_res_db1, get_res_db2);
let _: i64 = redis::cmd("DEL")
.arg("k1")
.query(&mut con1)
.with_context(|| "failed to run DEL")?;
let res_db1: String = redis::cmd("DEBUG")
.arg("digest")
.query(&mut con1)
.with_context(|| "failed to run DEBUG DIGEST")?;
assert_eq!(res_db1, "0".repeat(40));
let _: i64 = redis::cmd("DEL")
.arg("k1")
.query(&mut con2)
.with_context(|| "failed to run DEL")?;
let res_db2: String = redis::cmd("DEBUG")
.arg("digest")
.query(&mut con2)
.with_context(|| "failed to run DEBUG DIGEST")?;
assert_eq!(res_db2, "0".repeat(40));
assert_eq!(res_db1, res_db2);
Ok(())
}
#[test]
fn test_acl_categories() -> Result<()> {
let port = 6503;
let _guards =
vec![start_valkey_server_with_module("acl", port).with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let response_data: Vec<String> = redis::cmd("COMMAND")
.arg(&["LIST", "FILTERBY", "ACLCAT", "read"])
.query(&mut con)
.with_context(|| "failed to get list of commands associated with read")?;
let search_str = String::from("existing_categories");
assert!(response_data.contains(&search_str));
let response_data: Vec<String> = redis::cmd("COMMAND")
.arg(&["LIST", "FILTERBY", "ACLCAT", "custom_acl_one"])
.query(&mut con)
.with_context(|| "failed to get list of commands associated with custom_acl_one")?;
let search_str = String::from("custom_category");
assert!(response_data.contains(&search_str));
let search_str = String::from("custom_categories");
assert!(response_data.contains(&search_str));
let response_data: Vec<String> = redis::cmd("COMMAND")
.arg(&["LIST", "FILTERBY", "ACLCAT", "custom_acl_two"])
.query(&mut con)
.with_context(|| "failed to get list of commadns associated with custom_acl_two")?;
let search_str = String::from("custom_categories");
assert!(response_data.contains(&search_str));
Ok(())
}
#[test]
fn test_defrag() -> Result<()> {
let port = 6510;
let _guards = vec![start_valkey_server_with_module("data_type", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let memory_info: String = redis::cmd("info")
.arg("memory")
.query(&mut con)
.with_context(|| "Failed to run info memory")?;
if memory_info.contains("mem_allocator:libc") {
return Ok(());
}
redis::cmd("config")
.arg(&["set", "activedefrag", "yes"])
.exec(&mut con)
.with_context(|| "failed to run config set activedefrag")?;
redis::cmd("config")
.arg(&["set", "active-defrag-threshold-lower", "0"])
.exec(&mut con)
.with_context(|| "failed to run config set active-defrag-threshold-lower")?;
redis::cmd("config")
.arg(&["set", "active-defrag-ignore-bytes", "1"])
.exec(&mut con)
.with_context(|| "failed to run config set active-defrag-ignore-bytes")?;
for i in 1..10000 {
let key = format!("test_key_{}", i);
let _: i64 = redis::cmd("alloc.set")
.arg(&[&key, "500"])
.query(&mut con)
.with_context(|| "failed to run alloc.set")?;
}
let info: String = redis::cmd("info")
.arg("stats")
.query(&mut con)
.with_context(|| "failed to run info stats")?;
assert!(!(info.contains("active_defrag_misses:0") || !(info.contains("active_defrag_hits:0"))));
assert!(!(info.contains("total_active_defrag_time:0")));
for i in 1..1000 {
let key = format!("test_key_{}", i);
let get_return: String = redis::cmd("alloc.get")
.arg(key)
.query(&mut con)
.with_context(|| "failed to run alloc.set")?;
assert!(get_return == "A".repeat(500));
}
Ok(())
}
#[test]
fn test_client() -> Result<()> {
let port = 6507;
let _guards =
vec![start_valkey_server_with_module("client", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
redis::cmd("client.id")
.exec(&mut con)
.with_context(|| "failed execute client.id")?;
redis::cmd("client.set_name")
.arg("test_client")
.exec(&mut con)
.with_context(|| "failed execute client.set_name")?;
let resp: String = redis::cmd("client.get_name")
.query(&mut con)
.with_context(|| "failed execute client.get_name")?;
assert_eq!(resp, "test_client");
let resp: String = redis::cmd("client.username")
.query(&mut con)
.with_context(|| "failed execute client.username")?;
assert_eq!(resp, "default");
let resp: String = redis::cmd("client.cert")
.query(&mut con)
.with_context(|| "failed execute client.cert")?;
assert_eq!(resp, "");
let resp: String = redis::cmd("client.info")
.query(&mut con)
.with_context(|| "failed execute client.info")?;
assert_eq!(resp, "1");
let resp: String = redis::cmd("client.ip")
.query(&mut con)
.with_context(|| "failed execute client.ip")?;
assert_eq!(resp, "127.0.0.1");
let result = redis::cmd("client.deauth").arg(0).query::<String>(&mut con);
match result {
Ok(resp) => {
assert_eq!(resp, "OK");
}
Err(err) => {
let error_msg = err.to_string();
assert!(error_msg.contains("Failed: to deauthenticate and close client"));
}
}
let resp: String = redis::cmd("client.config_get")
.arg("maxmemory-policy")
.query(&mut con)
.with_context(|| "failed execute client.config_get")?;
assert_eq!(resp, "noeviction");
Ok(())
}
#[test]
fn test_filter() -> Result<()> {
let port = 6508;
let _guards =
vec![start_valkey_server_with_module("filter1", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
redis::cmd("MODULE")
.arg(&["LOAD", get_module_path("filter2").unwrap().as_str()])
.exec(&mut con)
.with_context(|| "failed to filter2 module")?;
let resp: String = redis::cmd("info")
.query(&mut con)
.with_context(|| "failed execute info")?;
assert_eq!(resp, "info2\n");
let _: () = con.set("foo", "bar")?;
let resp2: String = con.get("new_key")?;
assert_eq!(resp2, "new_value");
redis::cmd("MODULE")
.arg(&["UNLOAD", "filter1"])
.exec(&mut con)
.with_context(|| "failed to unload filter1 module")?;
let resp3: String = redis::cmd("info")
.query(&mut con)
.with_context(|| "failed execute info")?;
assert_ne!(resp3, "info2");
let _: () = con.set("foo", "bar")?;
let resp4: String = con.get("foo")?;
assert_eq!(resp4, "bar");
Ok(())
}
#[test]
fn test_non_blocking_auth_callbacks() -> Result<()> {
let port = 6513;
let _guards =
vec![start_valkey_server_with_module("auth", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let module_users = [("user1", "module_pass1"), ("user2", "module_pass2")];
let engine_users = [
("engine_user1", "engine_pass1"),
("engine_user2", "engine_pass2"),
];
setup_acl_users(
&mut con,
&module_users
.iter()
.map(|(u, _)| (*u, None))
.collect::<Vec<_>>(),
)?;
setup_acl_users(
&mut con,
&engine_users
.iter()
.map(|(u, p)| (*u, Some(*p)))
.collect::<Vec<_>>(),
)?;
for (user, module_pass) in module_users.iter() {
check_auth(&mut con, user, module_pass, AuthExpectedResult::Success)?;
check_auth(&mut con, user, "wrong", AuthExpectedResult::Denied)?;
}
for (user, engine_pass) in engine_users.iter() {
check_auth(&mut con, user, engine_pass, AuthExpectedResult::Success)?;
check_auth(&mut con, user, "wrong", AuthExpectedResult::EngineDenied)?;
}
Ok(())
}
#[test]
fn test_blocking_auth_callbacks() -> Result<()> {
let port = 6514;
let _guards =
vec![start_valkey_server_with_module("auth", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let mut con2 = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let mut con3 = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let users = [
("blockUser4", "module_blockPass4"),
("blockUser5", "module_blockPass5"),
("blockUser6", "module_blockPass6"),
("blockUserDelay", "blockPassDelay"),
];
let engine_users = [
("engine_user1", "engine_pass1"),
("engine_user2", "engine_pass2"),
];
setup_acl_users(
&mut con,
&users.iter().map(|(u, _)| (*u, None)).collect::<Vec<_>>(),
)?;
setup_acl_users(
&mut con,
&engine_users
.iter()
.map(|(u, p)| (*u, Some(*p)))
.collect::<Vec<_>>(),
)?;
let auth_handle = std::thread::spawn(move || {
let _: RedisResult<String> = redis::cmd("AUTH")
.arg(&["blockUserDelay", "blockPassDelay"])
.query(&mut con);
});
std::thread::sleep(std::time::Duration::from_millis(500));
let blocked_clients = check_blocked_clients(&mut con2)?;
assert!(
blocked_clients > 0,
"Expected to see blocked clients during auth"
);
auth_handle.join().unwrap();
for (user, module_pass) in users.iter() {
check_auth(&mut con3, user, module_pass, AuthExpectedResult::Success)?;
}
for (user, _) in users.iter() {
check_auth(&mut con3, user, "wrong", AuthExpectedResult::Denied)?;
}
for (user, pass) in engine_users.iter() {
check_auth(&mut con3, user, pass, AuthExpectedResult::Success)?;
check_auth(&mut con3, user, "wrong", AuthExpectedResult::EngineDenied)?;
}
check_auth(
&mut con3,
"blockAbort",
"abort",
AuthExpectedResult::Aborted,
)?;
Ok(())
}
#[test]
fn test_multiple_inflight_blocking_auth_callbacks() -> Result<()> {
let port = 6515;
let _guards =
vec![start_valkey_server_with_module("auth", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut setup_con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let users_auth1 = [
("blockUser1", "module_blockPass1"),
("blockUser2", "module_blockPass2"),
("blockUser3", "module_blockPass3"),
];
let users_auth2 = [
("blockUser4", "module_blockPass4"),
("blockUser5", "module_blockPass5"),
("blockUser6", "module_blockPass6"),
("blockUserDelay", "blockPassDelay"),
];
setup_acl_users(
&mut setup_con,
&users_auth1
.iter()
.map(|(u, _)| (*u, None))
.collect::<Vec<_>>(),
)?;
setup_acl_users(
&mut setup_con,
&users_auth2
.iter()
.map(|(u, _)| (*u, None))
.collect::<Vec<_>>(),
)?;
let auth2_handles: Vec<_> = users_auth2
.iter()
.map(|(user, _)| {
let mut con = get_valkey_connection(port)
.with_context(|| FAILED_TO_CONNECT_TO_SERVER)
.unwrap();
let user = user.to_string();
let wrong_pass = "wrong".to_string();
std::thread::spawn(move || {
let res: RedisResult<String> = redis::cmd("AUTH")
.arg(&[&user, &wrong_pass])
.query(&mut con);
(user, res)
})
})
.collect();
let auth1_handles: Vec<_> = users_auth1
.iter()
.map(|(user, _)| {
let mut con = get_valkey_connection(port)
.with_context(|| FAILED_TO_CONNECT_TO_SERVER)
.unwrap();
let user = user.to_string();
let wrong_pass = "wrong".to_string();
thread::spawn(move || {
let res: RedisResult<String> = redis::cmd("AUTH")
.arg(&[&user, &wrong_pass])
.query(&mut con);
(user, res)
})
})
.collect();
let auth2_results: Vec<(String, RedisResult<String>)> = auth2_handles
.into_iter()
.map(|handle| handle.join().unwrap())
.collect();
let auth1_results: Vec<(String, RedisResult<String>)> = auth1_handles
.into_iter()
.map(|handle| handle.join().unwrap())
.collect();
for (user, result) in auth2_results {
match result {
Ok(_) => panic!("Authentication should have failed for user {}", user),
Err(e) => {
let err_str = e.to_string();
assert!(
err_str.contains("blocked_auth_reply_two"),
"Wrong callback for user {}: expected 'blocked_auth_reply_two' in error, got: {}",
user,
err_str
);
}
}
}
for (user, result) in auth1_results {
match result {
Ok(_) => panic!("Authentication should have failed for user {}", user),
Err(e) => {
let err_str = e.to_string();
assert!(
err_str.contains("blocked_auth_reply_one"),
"Wrong callback for user {}: expected 'blocked_auth_reply_one' in error, got: {}",
user,
err_str
);
}
}
}
Ok(())
}
#[test]
fn test_disconnect_client_during_blocking_auth() -> Result<()> {
let port = 6516;
let _guards =
vec![start_valkey_server_with_module("auth", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let mut monitor_con =
get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let users = [("blockUserDelay", "blockPassDelay")];
setup_acl_users(
&mut con,
&users.iter().map(|(u, _)| (*u, None)).collect::<Vec<_>>(),
)?;
let users_clone = users; let auth_handle = std::thread::spawn(move || {
let (user, module_pass) = users_clone[0]; let result: RedisResult<String> =
redis::cmd("AUTH").arg(&[user, module_pass]).query(&mut con);
match result {
Ok(_) => panic!("Authentication should have failed due to disconnection"),
Err(e) => {
let err = e.to_string();
assert!(
err.contains("end of file"),
"Expected 'end of file' error, got: {}",
err
);
}
}
});
std::thread::sleep(std::time::Duration::from_millis(500));
let blocked_clients = check_blocked_clients(&mut monitor_con)?;
assert!(
blocked_clients > 0,
"Expected to see blocked clients during auth"
);
redis::cmd("CLIENT")
.arg(&["KILL", "SKIPME", "YES"])
.query::<()>(&mut monitor_con)?;
let blocked_clients_after = check_blocked_clients(&mut monitor_con)?;
assert_eq!(
blocked_clients_after, 0,
"Expected no blocked clients after disconnect"
);
auth_handle.join().unwrap_or_else(|e| {
panic!("Auth thread panicked: {:?}", e);
});
let mut verify_con =
get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let (user, module_pass) = users[0];
check_auth(
&mut verify_con,
user,
module_pass,
AuthExpectedResult::Success,
)?;
Ok(())
}
#[test]
fn test_preload() -> Result<()> {
let port = 6517;
let _guards =
vec![start_valkey_server_with_module("preload", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
redis::cmd("MODULE")
.arg(&["UNLOAD", "preload"])
.exec(&mut con)
.with_context(|| "failed to unload module")?;
Ok(())
}
#[test]
fn test_subcmd() -> Result<()> {
let port = 6518;
let _guards =
vec![start_valkey_server_with_module("subcmd", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let resp: Vec<String> = redis::cmd("cmd1")
.query(&mut con)
.with_context(|| "failed execute cmd1")?;
assert_eq!(resp.len(), 5);
let resp: Vec<String> = redis::cmd("cmd1")
.arg(&["help"])
.query(&mut con)
.with_context(|| "failed execute cmd1 help")?;
assert_eq!(resp.len(), 5);
let resp: Vec<String> = redis::cmd("cmd1")
.arg(&["info", "key"])
.query(&mut con)
.with_context(|| "failed execute cmd1 info")?;
assert!(resp == vec!["key", "value"]);
let resp: Vec<Vec<String>> = redis::cmd("cmd1")
.arg(&["info", "array"])
.query(&mut con)
.with_context(|| "failed execute cmd1 info")?;
assert!(resp.len() == 2);
assert!(resp[1] == vec!["a", "b", "c"]);
let resp: String = redis::cmd("cmd1")
.arg(&["s1"])
.query(&mut con)
.with_context(|| "failed execute cmd1 s1")?;
assert_eq!(resp, "sub1");
let resp: String = redis::cmd("cmd1")
.arg(&["s1", "s1"])
.query(&mut con)
.with_context(|| "failed execute cmd1 s1 s1")?;
assert_eq!(resp, "sub11");
let resp: String = redis::cmd("cmd1")
.arg(&["s1", "s1", "s1"])
.query(&mut con)
.with_context(|| "failed execute cmd1 s1 s1 s1")?;
assert_eq!(resp, "sub111");
Ok(())
}
#[test]
fn test_data_type3() -> Result<()> {
let port = 6519;
let _guards = vec![start_valkey_server_with_module("data_type3", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
redis::cmd("my-get")
.arg(&["my-data-type1"])
.exec(&mut con)
.with_context(|| "failed execute my-get")?;
redis::cmd("my-set-string")
.arg(&["my-data-type1", "string1"])
.exec(&mut con)?;
redis::cmd("my-set-number")
.arg(&["my-data-type1", "1"])
.exec(&mut con)?;
redis::cmd("my-vec-push")
.arg(&["my-data-type1", "a"])
.exec(&mut con)?;
redis::cmd("my-map-insert")
.arg(&["my-data-type1", "k1", "v1"])
.exec(&mut con)?;
let _resp: () = redis::cmd("my-get")
.arg(&["my-data-type1"])
.query(&mut con)
.with_context(|| "failed execute my-get")?;
Ok(())
}
#[test]
fn test_crontab() -> Result<()> {
let port = 6520;
let _guards =
vec![start_valkey_server_with_module("crontab", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let _con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
Ok(())
}
#[test]
fn test_master_link_change_event() -> Result<()> {
let primary_port: u16 = 6521;
let _guards = vec![
start_valkey_server_with_module("server_events", primary_port)
.with_context(|| FAILED_TO_START_SERVER)?,
];
let mut primary_con =
get_valkey_connection(primary_port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let replica_port: u16 = 6522;
let _guards = vec![
start_valkey_server_with_module("server_events", replica_port)
.with_context(|| FAILED_TO_START_SERVER)?,
];
let mut replica_con =
get_valkey_connection(replica_port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
println!("Setting up replication");
let _: () = redis::cmd("replicaof")
.arg("127.0.0.1")
.arg(primary_port)
.query(&mut replica_con)?;
thread::sleep(Duration::from_secs(10));
let event_count1: i64 = redis::cmd("num_master_link_change_events").query(&mut replica_con)?;
let is_master_link_up1: bool = redis::cmd("is_master_link_up").query(&mut replica_con)?;
println!(
"Verifying master link up event after replication setup: \
num_master_link_change_events {}, is_master_link_up {}",
event_count1, is_master_link_up1
);
assert_eq!(event_count1, 1);
assert!(is_master_link_up1);
println!("Shutting down primary server");
match redis::cmd("SHUTDOWN")
.arg("NOSAVE")
.query::<String>(&mut primary_con)
{
Ok(_) => {
println!("Primary server shut down successfully")
}
Err(e) if e.is_io_error() => {
println!("Primary server shut down successfully")
}
Err(_) => {
println!("Failed to shut down primary server")
}
}
drop(primary_con);
thread::sleep(Duration::from_secs(5));
let event_count2: i64 = redis::cmd("num_master_link_change_events").query(&mut replica_con)?;
let is_master_link_up2: bool = redis::cmd("is_master_link_up").query(&mut replica_con)?;
println!(
"Verifying master link up event after primary shutdown: \
num_master_link_change_events {}, is_master_link_up {}",
event_count2, is_master_link_up2
);
assert_eq!(event_count2, 2);
assert!(!is_master_link_up2);
Ok(())
}
#[test]
fn test_fork_child_event() -> Result<()> {
let port: u16 = 6523;
let _guards = vec![start_valkey_server_with_module("server_events", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
redis::cmd("bgsave")
.exec(&mut con)
.with_context(|| "failed to run bgsave")?;
let num_fork_child_event1: i64 = redis::cmd("num_fork_child_events").query(&mut con)?;
assert_eq!(num_fork_child_event1, 1);
thread::sleep(Duration::from_millis(100));
let num_fork_child_event2: i64 = redis::cmd("num_fork_child_events").query(&mut con)?;
assert_eq!(num_fork_child_event2, 2);
Ok(())
}
#[test]
fn test_replica_change_event() -> Result<()> {
let primary_port: u16 = 6524;
let _guards = vec![
start_valkey_server_with_module("server_events", primary_port)
.with_context(|| FAILED_TO_START_SERVER)?,
];
let mut primary_con =
get_valkey_connection(primary_port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let replica_port: u16 = 6525;
let _guards = vec![
start_valkey_server_with_module("server_events", replica_port)
.with_context(|| FAILED_TO_START_SERVER)?,
];
let mut replica_con =
get_valkey_connection(replica_port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let _: () = redis::cmd("replicaof")
.arg("127.0.0.1")
.arg(primary_port)
.query(&mut replica_con)?;
thread::sleep(Duration::from_millis(5000));
let event_count1: i64 = redis::cmd("num_replica_change_events").query(&mut primary_con)?;
assert_eq!(event_count1, 1);
let _: () = redis::cmd("replicaof")
.arg("no")
.arg("one")
.query(&mut replica_con)?;
let event_count2: i64 = redis::cmd("num_replica_change_events").query(&mut primary_con)?;
assert_eq!(event_count2, 2);
Ok(())
}
#[test]
fn test_repl_asnc_load_event() -> Result<()> {
let primary_port: u16 = 6526;
let _guards = vec![
start_valkey_server_with_module("server_events", primary_port)
.with_context(|| FAILED_TO_START_SERVER)?,
];
let mut primary_con =
get_valkey_connection(primary_port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let _: () = redis::cmd("config")
.arg(&["set", "repl-diskless-load", "swapdb"])
.exec(&mut primary_con)
.with_context(|| "failed to run config set repl-diskless-load")?;
let replica_port: u16 = 6527;
let _guards = vec![
start_valkey_server_with_module("server_events", replica_port)
.with_context(|| FAILED_TO_START_SERVER)?,
];
let mut replica_con =
get_valkey_connection(replica_port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let _: () = redis::cmd("config")
.arg(&["set", "repl-diskless-load", "swapdb"])
.exec(&mut replica_con)
.with_context(|| "failed to run config set repl-diskless-load")?;
let _: () = redis::cmd("replicaof")
.arg("127.0.0.1")
.arg(primary_port)
.query(&mut replica_con)?;
thread::sleep(Duration::from_millis(5000));
let event_count1: i64 = redis::cmd("num_repl_async_load_events").query(&mut replica_con)?;
assert_eq!(event_count1, 2);
Ok(())
}
#[test]
fn test_swapdb_event() -> Result<()> {
let port: u16 = 6528;
let _guards = vec![start_valkey_server_with_module("server_events", port)
.with_context(|| FAILED_TO_START_SERVER)?];
let mut con = get_valkey_connection(port).with_context(|| FAILED_TO_CONNECT_TO_SERVER)?;
let _: () = redis::cmd("swapdb").arg(&["0", "1"]).query(&mut con)?;
let event_count1: i64 = redis::cmd("num_swapdb_events").query(&mut con)?;
assert_eq!(event_count1, 1);
let _: () = redis::cmd("swapdb").arg(&["1", "2"]).query(&mut con)?;
let event_count2: i64 = redis::cmd("num_swapdb_events").query(&mut con)?;
assert_eq!(event_count2, 2);
Ok(())
}