use assert_cmd::Command;
use pretty_assertions::assert_eq;
use rdb::{self, filter, formatter};
use redis::Client;
use rstest::rstest;
use std::fs;
use std::fs::File;
use std::io::BufReader;
use std::path::Path;
use std::path::PathBuf;
use testcontainers::ContainerAsync;
use testcontainers_modules::{
redis::Redis, testcontainers::runners::AsyncRunner, testcontainers::ImageExt,
};
fn run_dump_test(input: PathBuf, format: &str) -> String {
let file_stem = input
.file_stem()
.expect("File should have a name")
.to_string_lossy();
let temp_output = PathBuf::from(format!("/tmp/rdb_test_{}.{}", file_stem, format));
let file = fs::File::open(&input).expect("Failed to open dump file");
let reader = BufReader::new(file);
match format {
"json" => {
let formatter = formatter::JSON::new(Some(temp_output.clone()));
let filter = filter::Simple::new();
rdb::parse(reader, formatter, filter).expect("Failed to parse RDB file");
}
"protocol" => {
let formatter = formatter::Protocol::new(Some(temp_output.clone()));
let filter = filter::Simple::new();
rdb::parse(reader, formatter, filter).expect("Failed to parse RDB file");
}
"plain" => {
let formatter = formatter::Plain::new(Some(temp_output.clone()));
let filter = filter::Simple::new();
rdb::parse(reader, formatter, filter).expect("Failed to parse RDB file");
}
_ => {
panic!("Invalid format: {}", format);
}
}
let actual =
String::from_utf8_lossy(&fs::read(&temp_output).expect("Failed to read output file"))
.into_owned();
fs::remove_file(&temp_output).ok();
actual
}
fn load_expected(path: PathBuf, format: &str) -> String {
let file_stem = path
.file_stem()
.expect("File should have a name")
.to_string_lossy();
let expected_json_path = format!("tests/dumps/{}/{}.{}", format, file_stem, format);
fs::read_to_string(&expected_json_path).unwrap_or_else(|_| {
String::from_utf8_lossy(&fs::read(&expected_json_path).expect("Failed to read file"))
.into_owned()
})
}
#[rstest]
#[case::json("json")]
#[case::plain("plain")]
#[case::protocol("protocol")]
fn test_dump_matches_expected(#[files("tests/dumps/*.rdb")] path: PathBuf, #[case] format: &str) {
let actual = run_dump_test(path.clone(), format);
let expected = load_expected(path.clone(), format);
assert_eq!(
actual,
expected,
"Output doesn't match for {}",
path.display()
);
}
async fn redis_client(major_version: u8, minor_version: u8) -> (Client, ContainerAsync<Redis>) {
let container = Redis::default()
.with_tag(format!("{}.{}-alpine", major_version, minor_version))
.start()
.await
.expect("Failed to start Redis container");
let host_ip = container.get_host().await.unwrap();
let host_port = container.get_host_port_ipv4(6379).await.unwrap();
let url = format!("redis://{}:{}", host_ip, host_port);
let client = Client::open(url).expect("Failed to create Redis client");
(client, container)
}
fn to_resp_format(command: &str, args: &[&str]) -> String {
let mut resp = format!("*{}\r\n", args.len() + 1); resp.push_str(&format!("${}\r\n{}\r\n", command.len(), command));
for arg in args {
resp.push_str(&format!("${}\r\n{}\r\n", arg.len(), arg));
}
resp
}
async fn execute_commands(conn: &mut redis::Connection, commands: &[(&str, Vec<&str>)]) -> String {
let mut resp = String::from("*2\r\n$6\r\nSELECT\r\n$1\r\n0\r\n");
for (cmd, args) in commands {
let mut redis_cmd = redis::cmd(cmd);
for arg in args {
redis_cmd.arg(arg);
}
redis_cmd.exec(conn).unwrap();
resp.push_str(&to_resp_format(cmd, args));
}
resp
}
fn parse_rdb_to_resp(rdb_path: &Path) -> String {
let rdb_file = File::open(rdb_path).unwrap();
let rdb_reader = BufReader::new(rdb_file);
let tmp_file = tempfile::NamedTempFile::new().unwrap();
rdb::parse(
rdb_reader,
rdb::formatter::Protocol::new(Some(tmp_file.path().to_path_buf())),
rdb::filter::Simple::new(),
)
.unwrap();
let output = std::fs::read(tmp_file.path()).unwrap();
String::from_utf8(output).unwrap()
}
fn split_resp_commands(resp: &str) -> Vec<String> {
let mut commands: Vec<String> = resp
.split("*")
.filter(|s| !s.is_empty())
.map(|s| format!("*{}", s))
.collect();
if !commands.is_empty() && commands[0].contains("SELECT") {
commands.remove(0);
}
commands
}
#[rstest]
#[case::redis_6_2(6, 2)]
#[case::redis_7_0(7, 0)]
#[case::redis_7_2(7, 2)]
#[case::redis_7_4(7, 4)]
#[tokio::test]
async fn test_redis_protocol_reproducibility(#[case] major_version: u8, #[case] minor_version: u8) {
let (client, container) = redis_client(major_version, minor_version).await;
let mut conn = client.get_connection().unwrap();
let commands = vec![
("SET", vec!["string", "bar"]),
("HSET", vec!["hash", "name", "John"]),
("HSET", vec!["hash", "age", "25"]),
("SADD", vec!["set_strings", "Earth"]),
("SADD", vec!["set_strings", "Mars"]),
("SADD", vec!["set_strings", "Jupiter"]),
("SADD", vec!["set_numbers", "1"]),
("SADD", vec!["set_numbers", "2"]),
("SADD", vec!["set_numbers", "3"]),
("ZADD", vec!["sorted_set", "1", "a"]),
("ZADD", vec!["sorted_set", "2", "b"]),
("ZADD", vec!["sorted_set", "3", "c"]),
("RPUSH", vec!["list_strings", "Rex"]),
("RPUSH", vec!["list_strings", "Bella"]),
("RPUSH", vec!["list_strings", "Max"]),
("RPUSH", vec!["list_numbers", "1"]),
("RPUSH", vec!["list_numbers", "2"]),
("RPUSH", vec!["list_numbers", "3"]),
];
let expected_resp = execute_commands(&mut conn, &commands).await;
redis::cmd("SAVE").exec(&mut conn).unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let container_id = container.id();
let temp_file = tempfile::NamedTempFile::new().unwrap();
let status = std::process::Command::new("docker")
.args([
"cp",
"-q",
&format!("{}:/data/dump.rdb", container_id),
temp_file.path().to_str().unwrap(),
])
.status()
.expect("Failed to execute docker cp");
assert!(status.success(), "docker cp command failed");
let actual_resp = parse_rdb_to_resp(temp_file.path());
let expected_commands: std::collections::HashSet<_> =
split_resp_commands(&expected_resp).into_iter().collect();
let actual_commands: std::collections::HashSet<_> =
split_resp_commands(&actual_resp).into_iter().collect();
assert_eq!(actual_commands, expected_commands);
}
#[rstest]
fn test_cli_commands_succeed(
#[files("tests/dumps/*.rdb")] path: PathBuf,
#[values("json", "plain", "protocol")] format: &str,
#[values("", "1")] databases: &str,
#[values("", "hash", "set", "list", "sortedset", "string")] types: &str,
) {
let mut cmd = Command::cargo_bin("rdb").unwrap();
cmd.args(["--format", format]);
if !databases.is_empty() {
cmd.args(["--databases", databases]);
}
if !types.is_empty() {
cmd.args(["--type", types]);
}
cmd.arg(&path).assert().success();
}