mod constants;
mod utilities;
#[cfg(test)]
mod standalone_client_tests {
use super::*;
use crate::constants::{IP_ADDRESS_V4, IP_ADDRESS_V6};
use crate::utilities::mocks::{Mock, ServerMock};
use ferriskey::{
client::StandaloneClient,
client::types::ReadFrom,
};
use ferriskey::{FromValue, Value};
use rstest::rstest;
use std::collections::HashMap;
use utilities::*;
async fn get_connected_clients(client: &mut StandaloneClient) -> usize {
let mut cmd = ferriskey::Cmd::new();
cmd.arg("CLIENT").arg("LIST");
let result: Value = client.send_command(&cmd).await.expect("CLIENT LIST failed");
match result {
Value::BulkString(bytes) => {
let s = String::from_utf8_lossy(&bytes);
s.lines().count()
}
Value::VerbatimString { format: _, text } => {
text.lines().count()
}
_ => {
panic!("CLIENT LIST did not return a BulkString or VerbatimString, got: {result:?}")
}
}
}
#[rstest]
#[serial_test::serial]
#[timeout(LONG_STANDALONE_TEST_TIMEOUT)]
fn test_detect_disconnect_and_reconnect_using_heartbeat(#[values(false, true)] use_tls: bool) {
let (sender, receiver) = tokio::sync::oneshot::channel();
block_on_all(async move {
let mut test_basics = setup_test_basics(use_tls).await;
let server = test_basics.server.expect("Server shouldn't be None");
let address = server.get_client_addr();
drop(server);
std::thread::spawn(move || {
block_on_all(async move {
let new_server = ValkeyServer::new_with_addr_and_modules(address.clone(), &[]);
wait_for_server_to_become_ready(&address).await;
let _ = sender.send(new_server);
})
});
let _new_server = receiver.await;
tokio::time::sleep(
ferriskey::client::HEARTBEAT_SLEEP_DURATION + std::time::Duration::from_secs(1),
)
.await;
let mut get_command = ferriskey::Cmd::new();
get_command
.arg("GET")
.arg("test_detect_disconnect_and_reconnect_using_heartbeat");
let get_result = test_basics.client.send_command(&get_command).await.unwrap();
assert_eq!(get_result, Value::Nil);
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_automatic_reconnect(#[values(false, true)] use_tls: bool) {
block_on_all(async move {
let shared_config = TestConfiguration {
use_tls,
cluster_mode: ClusterMode::Disabled,
shared_server: true,
..Default::default()
};
let mut validation_client = setup_test_basics_internal(&shared_config).await;
let mut monitoring_client = setup_test_basics_internal(&shared_config).await;
let mut info_clients_cmd = ferriskey::Cmd::new();
info_clients_cmd.arg("INFO").arg("CLIENTS");
let info_clients: String = ferriskey::from_owned_value(
monitoring_client
.client
.send_command(&info_clients_cmd)
.await
.unwrap(),
)
.unwrap();
assert!(info_clients.contains("connected_clients:2"));
kill_connection(&mut validation_client.client).await;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let info_clients: String = ferriskey::from_owned_value(
monitoring_client
.client
.send_command(&info_clients_cmd)
.await
.unwrap(),
)
.unwrap();
assert!(info_clients.contains("connected_clients:2"));
assert_connected(&mut validation_client.client).await;
});
}
fn get_mock_addresses(mocks: &[ServerMock]) -> Vec<ferriskey::ConnectionAddr> {
mocks.iter().flat_map(|mock| mock.get_addresses()).collect()
}
fn create_primary_responses() -> HashMap<String, Value> {
let mut primary_responses = std::collections::HashMap::new();
primary_responses.insert(
"*1\r\n$4\r\nPING\r\n".to_string(),
Value::BulkString(b"PONG".to_vec().into()),
);
primary_responses.insert(
"*2\r\n$4\r\nINFO\r\n$11\r\nREPLICATION\r\n".to_string(),
Value::BulkString(b"role:master\r\nconnected_slaves:3\r\n".to_vec().into()),
);
primary_responses.insert(
"*1\r\n$4\r\nINFO\r\n".to_string(),
Value::BulkString(b"# Server\r\navailability_zone:us-east-1a\r\n".to_vec().into()),
);
primary_responses.insert(
"*2\r\n$5\r\nHELLO\r\n$1\r\n3\r\n".to_string(),
Value::Map(vec![
(Value::BulkString(b"proto".to_vec().into()), Value::Int(3)),
(
Value::BulkString(b"role".to_vec().into()),
Value::BulkString(b"master".to_vec().into()),
),
]),
);
primary_responses
}
fn create_replica_response() -> HashMap<String, Value> {
let mut replica_responses = std::collections::HashMap::new();
replica_responses.insert(
"*1\r\n$4\r\nPING\r\n".to_string(),
Value::BulkString(b"PONG".to_vec().into()),
);
replica_responses.insert(
"*2\r\n$4\r\nINFO\r\n$11\r\nREPLICATION\r\n".to_string(),
Value::BulkString(b"role:slave\r\n".to_vec().into()),
);
replica_responses.insert(
"*1\r\n$4\r\nINFO\r\n".to_string(),
Value::BulkString(b"# Server\r\navailability_zone:us-east-1a\r\n".to_vec().into()),
);
replica_responses.insert(
"*2\r\n$5\r\nHELLO\r\n$1\r\n3\r\n".to_string(),
Value::Map(vec![
(Value::BulkString(b"proto".to_vec().into()), Value::Int(3)),
(
Value::BulkString(b"role".to_vec().into()),
Value::BulkString(b"replica".to_vec().into()),
),
]),
);
replica_responses
}
fn create_primary_conflict_mock_two_primaries_one_replica() -> Vec<ServerMock> {
let mut listeners: Vec<std::net::TcpListener> =
(0..3).map(|_| get_listener_on_available_port()).collect();
let primary_1 =
ServerMock::new_with_listener(create_primary_responses(), listeners.pop().unwrap());
let primary_2 =
ServerMock::new_with_listener(create_primary_responses(), listeners.pop().unwrap());
let replica =
ServerMock::new_with_listener(create_replica_response(), listeners.pop().unwrap());
vec![primary_1, primary_2, replica]
}
fn create_primary_mock_with_replicas(replica_count: usize) -> Vec<ServerMock> {
let mut listeners: Vec<std::net::TcpListener> = (0..replica_count + 1)
.map(|_| get_listener_on_available_port())
.collect();
let primary =
ServerMock::new_with_listener(create_primary_responses(), listeners.pop().unwrap());
let mut mocks = vec![primary];
mocks.extend(
listeners
.into_iter()
.map(|listener| ServerMock::new_with_listener(create_replica_response(), listener)),
);
mocks
}
struct ReadFromReplicaTestConfig {
read_from: ReadFrom,
expected_primary_reads: u16,
expected_replica_reads: Vec<u16>,
number_of_initial_replicas: usize,
number_of_missing_replicas: usize,
number_of_replicas_dropped_after_connection: usize,
number_of_requests_sent: usize,
}
impl Default for ReadFromReplicaTestConfig {
fn default() -> Self {
Self {
read_from: ReadFrom::Primary,
expected_primary_reads: 3,
expected_replica_reads: vec![0, 0, 0],
number_of_initial_replicas: 3,
number_of_missing_replicas: 0,
number_of_replicas_dropped_after_connection: 0,
number_of_requests_sent: 3,
}
}
}
fn test_read_from_replica(config: ReadFromReplicaTestConfig) {
let mut servers = create_primary_mock_with_replicas(
config.number_of_initial_replicas - config.number_of_missing_replicas,
);
let mut cmd = ferriskey::cmd("GET");
cmd.arg("foo");
for server in servers.iter() {
for _ in 0..3 {
server.add_response(&cmd, "$-1\r\n".to_string());
}
}
let mut addresses = get_mock_addresses(&servers);
for i in 4 - config.number_of_missing_replicas..4 {
addresses.push(ferriskey::ConnectionAddr::Tcp(
"192.0.2.1".to_string(), 6379 + i as u16,
));
}
let mut connection_request =
create_connection_request(addresses.as_slice(), &Default::default());
connection_request.read_from = Some(config.read_from);
block_on_all(async {
let mut client =
create_test_standalone_client(connection_request, None)
.await
.unwrap();
tracing::info!(
"Test - Closing {} servers after connection established",
config.number_of_replicas_dropped_after_connection
);
for server in servers.drain(1..config.number_of_replicas_dropped_after_connection + 1) {
server.close().await;
}
tracing::info!(
"Test - sending {} messages", config.number_of_requests_sent
);
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
for _ in 0..config.number_of_requests_sent {
let _ = client.send_command(&cmd).await;
}
});
assert_eq!(
servers[0].get_number_of_received_commands(),
config.expected_primary_reads
);
let mut replica_reads: Vec<_> = servers
.iter()
.skip(1)
.map(|mock| mock.get_number_of_received_commands())
.collect();
replica_reads.sort();
assert!(config.expected_replica_reads <= replica_reads);
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_read_from_replica_always_read_from_primary() {
test_read_from_replica(ReadFromReplicaTestConfig::default());
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_read_from_replica_round_robin() {
test_read_from_replica(ReadFromReplicaTestConfig {
read_from: ReadFrom::PreferReplica,
expected_primary_reads: 0,
expected_replica_reads: vec![1, 1, 1],
..Default::default()
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_read_from_replica_az_affinity() {
test_read_from_replica(ReadFromReplicaTestConfig {
read_from: ReadFrom::AZAffinity("us-east-1a".to_string()),
expected_primary_reads: 0,
expected_replica_reads: vec![1, 1, 1],
..Default::default()
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_read_from_replica_az_affinity_replicas_and_primary() {
test_read_from_replica(ReadFromReplicaTestConfig {
read_from: ReadFrom::AZAffinityReplicasAndPrimary("us-east-1a".to_string()),
expected_primary_reads: 0,
expected_replica_reads: vec![1, 1, 1],
..Default::default()
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_read_from_replica_round_robin_skip_disconnected_replicas() {
test_read_from_replica(ReadFromReplicaTestConfig {
read_from: ReadFrom::PreferReplica,
expected_primary_reads: 0,
expected_replica_reads: vec![1, 2],
number_of_missing_replicas: 1,
..Default::default()
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_read_from_replica_round_robin_read_from_primary_if_no_replica_is_connected() {
test_read_from_replica(ReadFromReplicaTestConfig {
read_from: ReadFrom::PreferReplica,
expected_primary_reads: 3,
expected_replica_reads: vec![],
number_of_missing_replicas: 3,
..Default::default()
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_read_from_replica_round_robin_do_not_read_from_disconnected_replica() {
test_read_from_replica(ReadFromReplicaTestConfig {
read_from: ReadFrom::PreferReplica,
expected_primary_reads: 0,
expected_replica_reads: vec![3, 3],
number_of_replicas_dropped_after_connection: 1,
number_of_requests_sent: 6,
..Default::default()
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_read_from_replica_round_robin_with_single_replica() {
test_read_from_replica(ReadFromReplicaTestConfig {
read_from: ReadFrom::PreferReplica,
expected_primary_reads: 0,
expected_replica_reads: vec![3],
number_of_initial_replicas: 1,
number_of_requests_sent: 3,
..Default::default()
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_primary_conflict_raises_error() {
let mocks = create_primary_conflict_mock_two_primaries_one_replica();
let addresses = get_mock_addresses(&mocks);
let connection_request =
create_connection_request(addresses.as_slice(), &Default::default());
block_on_all(async {
let client_res =
create_test_standalone_client(connection_request, None).await;
assert!(client_res.is_err());
let error = client_res.unwrap_err();
let primary_1_addr = addresses.first().unwrap().to_string();
let primary_2_addr = addresses.get(1).unwrap().to_string();
let replica_addr = addresses.get(2).unwrap().to_string();
let err_msg = format!("{error:?}").to_ascii_lowercase();
assert!(
err_msg.contains("conflict")
&& err_msg.contains(&primary_1_addr)
&& err_msg.contains(&primary_2_addr)
&& !err_msg.contains(&replica_addr)
);
});
}
#[rstest]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_send_acl_request_to_all_nodes() {
let mocks = create_primary_mock_with_replicas(2);
let mut cmd = ferriskey::cmd("ACL");
cmd.arg("SETUSER").arg("foo");
for mock in mocks.iter() {
for _ in 0..3 {
mock.add_response(&cmd, "+OK\r\n".to_string());
}
}
let addresses: Vec<ferriskey::ConnectionAddr> =
mocks.iter().flat_map(|mock| mock.get_addresses()).collect();
let connection_request =
create_connection_request(addresses.as_slice(), &Default::default());
block_on_all(async {
let mut client =
create_test_standalone_client(connection_request, None)
.await
.unwrap();
let result = client.send_command(&cmd).await;
assert_eq!(result, Ok(Value::Okay));
});
for mock in mocks {
assert_eq!(mock.get_number_of_received_commands(), 1);
}
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_set_database_id_after_reconnection() {
let mut client_info_cmd = ferriskey::Cmd::new();
client_info_cmd.arg("CLIENT").arg("INFO");
block_on_all(async move {
let test_basics = setup_test_basics_internal(&TestConfiguration {
database_id: 4,
shared_server: true,
..Default::default()
})
.await;
let mut client = test_basics.client;
let client_info: String = String::from_owned_value(
client.send_command(&client_info_cmd).await.unwrap(),
)
.unwrap();
assert!(client_info.contains("db=4"));
let initial_client_id =
extract_client_id(&client_info).expect("Failed to extract initial client ID");
kill_connection(&mut client).await;
let res = client.send_command(&client_info_cmd).await;
match res {
Err(err) => {
assert!(
err.is_connection_dropped()
|| err.is_timeout()
|| err.kind() == ferriskey::ErrorKind::AllConnectionsUnavailable,
"Expected connection dropped, timeout, or unavailable error, got: {err:?}",
);
let client_info = retry(|| async {
let mut client = client.clone();
String::from_owned_value(
client.send_command(&client_info_cmd).await.unwrap(),
)
.ok()
})
.await;
assert!(client_info.contains("db=4"));
}
Ok(response) => {
let new_client_info: String = String::from_owned_value(response).unwrap();
let new_client_id = extract_client_id(&new_client_info)
.expect("Failed to extract new client ID");
assert_ne!(
initial_client_id, new_client_id,
"Client ID should change after reconnection if command succeeds"
);
assert!(new_client_info.contains("db=4"));
}
}
});
}
#[rstest]
#[serial_test::serial]
#[timeout(LONG_STANDALONE_TEST_TIMEOUT)]
fn test_lazy_connection_establishes_on_first_command(
#[values(ferriskey::ProtocolVersion::RESP2, ferriskey::ProtocolVersion::RESP3)] protocol: ferriskey::ProtocolVersion,
) {
block_on_all(async move {
const USE_TLS: bool = false;
let base_config_for_dedicated_server = utilities::TestConfiguration {
use_tls: USE_TLS,
protocol,
shared_server: false, cluster_mode: ClusterMode::Disabled,
lazy_connect: false, ..Default::default()
};
let mut monitoring_test_basics =
utilities::setup_test_basics_internal(&base_config_for_dedicated_server).await;
let monitoring_client = &mut monitoring_test_basics.client;
let dedicated_server_address = match &monitoring_test_basics.server {
Some(server) => {
server.get_client_addr().clone()
}
None => panic!(
"Expected a dedicated standalone server to be created by setup_test_basics_internal"
),
};
let clients_before_lazy_init = get_connected_clients(monitoring_client).await;
tracing::info!(
"TestStandaloneLazy - Clients before lazy client init (protocol={protocol:?} on dedicated server): {clients_before_lazy_init}"
);
let mut lazy_client_config = base_config_for_dedicated_server.clone();
lazy_client_config.lazy_connect = true;
let mut lazy_client_connection_request_pb = utilities::create_connection_request(
std::slice::from_ref(&dedicated_server_address),
&lazy_client_config,
);
lazy_client_connection_request_pb.cluster_mode_enabled = false;
let core_connection_request: ferriskey::client::types::ConnectionRequest =
lazy_client_connection_request_pb;
let mut lazy_ferriskey_client_enum =
ferriskey::LazyClient::from_config(core_connection_request)
.expect("Failed to construct lazy FerrisKeyClient for dedicated server");
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let clients_after_lazy_init = get_connected_clients(monitoring_client).await; tracing::info!(
"TestStandaloneLazy - Clients after lazy client init (protocol={protocol:?} on dedicated server): {clients_after_lazy_init}"
);
assert_eq!(
clients_after_lazy_init, clients_before_lazy_init,
"Lazy client (on dedicated server) should not connect before the first command. Before: {clients_before_lazy_init}, After: {clients_after_lazy_init}. protocol={protocol:?}"
);
tracing::info!(
"TestStandaloneLazy - Sending first command to lazy client (PING) (protocol={protocol:?} on dedicated server)"
);
assert_connected(&mut lazy_ferriskey_client_enum).await;
let clients_after_first_command = get_connected_clients(monitoring_client).await; tracing::info!(
"TestStandaloneLazy - Clients after first command (protocol={protocol:?} on dedicated server): {clients_after_first_command}"
);
assert_eq!(
clients_after_first_command,
clients_before_lazy_init + 1,
"Lazy client (on dedicated server) should connect after the first command. Before: {clients_before_lazy_init}, After: {clients_after_first_command}. protocol={protocol:?}"
);
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_tls_connection_with_custom_root_cert() {
block_on_all(async move {
let tempdir = tempfile::tempdir().expect("Failed to create temp dir");
let tls_paths = build_tls_file_paths(&tempdir);
let ca_cert_bytes = tls_paths.read_ca_cert_as_bytes();
let server = ValkeyServer::new_with_tls(true, Some(tls_paths));
let server_addr = server.get_client_addr();
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let mut connection_request = create_connection_request(
&[server_addr],
&TestConfiguration {
use_tls: true,
shared_server: false,
..Default::default()
},
);
connection_request.tls_mode = Some(ferriskey::client::types::TlsMode::SecureTls);
connection_request.root_certs = vec![ca_cert_bytes];
let mut client =
create_test_standalone_client(connection_request, None)
.await
.expect("Failed to create client with custom root cert");
assert_connected(&mut client).await;
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_tls_connection_fails_with_wrong_root_cert() {
block_on_all(async move {
let server_tempdir = tempfile::tempdir().expect("Failed to create temp dir");
let server_tls_paths = build_tls_file_paths(&server_tempdir);
let client_tempdir = tempfile::tempdir().expect("Failed to create temp dir");
let client_tls_paths = build_tls_file_paths(&client_tempdir);
let wrong_ca_cert_bytes = client_tls_paths.read_ca_cert_as_bytes();
let server = ValkeyServer::new_with_tls(true, Some(server_tls_paths));
let server_addr = server.get_client_addr();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let mut connection_request = create_connection_request(
&[server_addr],
&TestConfiguration {
use_tls: true,
shared_server: false,
..Default::default()
},
);
connection_request.tls_mode = ferriskey::client::types::TlsMode::SecureTls.into();
connection_request.root_certs = vec![wrong_ca_cert_bytes];
connection_request.connection_retry_strategy =
Some(ferriskey::client::types::ConnectionRetryStrategy {
number_of_retries: 1,
factor: 1,
exponent_base: 1,
..Default::default()
});
let client_result =
create_test_standalone_client(connection_request, None).await;
assert!(
client_result.is_err(),
"Expected connection to fail with wrong root certificate"
);
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_tls_connection_fails_with_invalid_cert_bytes() {
block_on_all(async move {
let server_addr = ferriskey::ConnectionAddr::TcpTls {
host: IP_ADDRESS_V4.to_string(),
port: get_available_port(),
insecure: false,
tls_params: None,
};
let mut connection_request = create_connection_request(
&[server_addr],
&TestConfiguration {
use_tls: true,
shared_server: false,
..Default::default()
},
);
connection_request.tls_mode = ferriskey::client::types::TlsMode::SecureTls.into();
connection_request.root_certs = vec![
b"-----BEGIN CERTIFICATE-----\n!!!invalid base64!!!\n-----END CERTIFICATE-----"
.to_vec(),
];
let client_result =
create_test_standalone_client(connection_request, None).await;
assert!(
client_result.is_err(),
"Expected client creation to fail with invalid certificate bytes"
);
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_tls_connection_fails_with_custom_certs_and_no_tls() {
block_on_all(async move {
let server_addr =
ferriskey::ConnectionAddr::Tcp(IP_ADDRESS_V4.to_string(), get_available_port());
let mut connection_request = create_connection_request(
&[server_addr],
&TestConfiguration {
use_tls: false,
shared_server: false,
..Default::default()
},
);
connection_request.tls_mode = ferriskey::client::types::TlsMode::NoTls.into();
connection_request.root_certs = vec![b"some certificate".to_vec()];
let client_result =
create_test_standalone_client(connection_request, None).await;
assert!(
client_result.is_err(),
"Expected client creation to fail when custom certs provided with NoTls mode"
);
let err = client_result.unwrap_err();
let err_msg = format!("{:?}", err).to_lowercase();
assert!(
err_msg.contains("tls") && err_msg.contains("disabled"),
"Error message should mention TLS being disabled, got: {}",
err_msg
);
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_tls_connection_with_multiple_root_certs_first_invalid() {
block_on_all(async move {
let server_tempdir = tempfile::tempdir().expect("Failed to create temp dir");
let server_tls_paths = build_tls_file_paths(&server_tempdir);
let valid_ca_cert_bytes = server_tls_paths.read_ca_cert_as_bytes();
let invalid_tempdir = tempfile::tempdir().expect("Failed to create temp dir");
let invalid_tls_paths = build_tls_file_paths(&invalid_tempdir);
let invalid_ca_cert_bytes = invalid_tls_paths.read_ca_cert_as_bytes();
let server = ValkeyServer::new_with_tls(true, Some(server_tls_paths));
let server_addr = server.get_client_addr();
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let mut connection_request = create_connection_request(
&[server_addr],
&TestConfiguration {
use_tls: true,
shared_server: false,
..Default::default()
},
);
connection_request.tls_mode = ferriskey::client::types::TlsMode::SecureTls.into();
connection_request.root_certs =
vec![invalid_ca_cert_bytes, valid_ca_cert_bytes];
let mut client =
create_test_standalone_client(connection_request, None)
.await
.expect("Failed to create client with multiple root certs");
assert_connected(&mut client).await;
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_tls_connection_with_client_tls_auth() {
block_on_all(async move {
let tempdir = tempfile::tempdir().expect("Failed to create temp dir");
let tls_paths = build_tls_file_paths(&tempdir);
let ca_cert_bytes = tls_paths.read_ca_cert_as_bytes();
let client_cert_bytes = tls_paths.read_valkey_cert_as_bytes();
let client_key_bytes = tls_paths.read_valkey_key_as_bytes();
let server = ValkeyServer::new_with_addr_tls_modules_and_spawner(
ferriskey::ConnectionAddr::TcpTls {
host: "127.0.0.1".to_string(),
port: get_available_port(),
insecure: false,
tls_params: None,
},
Some(tls_paths.clone()),
&[],
true,
|cmd| cmd.spawn().expect("Failed to spawn server"),
);
let server_addr = server.get_client_addr();
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let mut connection_request = create_connection_request(
&[server_addr],
&TestConfiguration {
use_tls: true,
shared_server: false,
..Default::default()
},
);
connection_request.tls_mode = ferriskey::client::types::TlsMode::SecureTls.into();
connection_request.root_certs = vec![ca_cert_bytes];
connection_request.client_cert = client_cert_bytes;
connection_request.client_key = client_key_bytes;
let mut client =
create_test_standalone_client(connection_request, None)
.await
.expect("Failed to create client with custom root cert");
assert_connected(&mut client).await;
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_tls_connection_with_ip_address_succeeds(
#[values(IP_ADDRESS_V4, IP_ADDRESS_V6)] host: &str,
) {
block_on_all(async move {
let tempdir = tempfile::tempdir().expect("Failed to create temp dir");
let tls_paths = build_tls_file_paths(&tempdir);
let ca_cert_bytes = tls_paths.read_ca_cert_as_bytes();
let ip_addr = ferriskey::ConnectionAddr::TcpTls {
host: host.to_string(),
port: get_available_port(),
insecure: false,
tls_params: None,
};
let _server = ValkeyServer::new_with_addr_tls_modules_and_spawner(
ip_addr.clone(),
Some(tls_paths.clone()),
&[],
false,
|cmd| cmd.spawn().expect("Failed to spawn server"),
);
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
let mut connection_request = create_connection_request(
&[ip_addr],
&TestConfiguration {
use_tls: true,
shared_server: false,
..Default::default()
},
);
connection_request.tls_mode = ferriskey::client::types::TlsMode::SecureTls.into();
connection_request.root_certs = vec![ca_cert_bytes];
let mut client =
create_test_standalone_client(connection_request, None)
.await
.expect("Failed to create client with IP address");
assert_connected(&mut client).await;
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_connection_with_ip_address_succeeds(
#[values(IP_ADDRESS_V4, IP_ADDRESS_V6)] host: &str,
) {
block_on_all(async move {
let ip_addr = ferriskey::ConnectionAddr::Tcp(host.to_string(), get_available_port());
let _server = ValkeyServer::new_with_addr_and_modules(ip_addr.clone(), &[]);
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
let connection_request = create_connection_request(
&[ip_addr],
&TestConfiguration {
shared_server: false,
..Default::default()
},
);
let mut client =
create_test_standalone_client(connection_request, None)
.await
.expect("Failed to create client with IP address");
assert_connected(&mut client).await;
});
}
fn create_replica_only_responses() -> HashMap<String, Value> {
let mut responses = std::collections::HashMap::new();
responses.insert(
"*1\r\n$4\r\nPING\r\n".to_string(),
Value::BulkString(b"PONG".to_vec().into()),
);
responses.insert(
"*2\r\n$3\r\nGET\r\n$3\r\nfoo\r\n".to_string(),
Value::BulkString(b"bar".to_vec().into()),
);
responses.insert(
"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n".to_string(),
Value::Okay,
);
responses
}
fn create_replica_only_mock() -> ServerMock {
let listener = get_listener_on_available_port();
ServerMock::new_with_listener(create_replica_only_responses(), listener)
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_read_only_mode_connects_without_primary() {
let mock = create_replica_only_mock();
let addresses = get_mock_addresses(&[mock]);
let mut connection_request =
create_connection_request(addresses.as_slice(), &Default::default());
connection_request.read_only = true;
block_on_all(async {
let client_result =
create_test_standalone_client(connection_request, None).await;
assert!(
client_result.is_ok(),
"read_only mode should connect without requiring a primary node"
);
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_read_only_mode_blocks_write_commands() {
let servers = create_primary_mock_with_replicas(0);
let mock = &servers[0];
let mut get_cmd = ferriskey::cmd("GET");
get_cmd.arg("foo");
mock.add_response(&get_cmd, "$3\r\nbar\r\n".to_string());
let addresses = get_mock_addresses(&servers);
let mut connection_request =
create_connection_request(addresses.as_slice(), &Default::default());
connection_request.read_only = true;
block_on_all(async {
let mut client =
create_test_standalone_client(connection_request, None)
.await
.unwrap();
let mut set_cmd = ferriskey::cmd("SET");
set_cmd.arg("foo").arg("bar");
let result = client.send_command(&set_cmd).await;
assert!(result.is_err(), "Write command should be blocked");
let err = result.unwrap_err();
assert!(
err.to_string()
.contains("write commands are not allowed in read-only mode"),
"Error message should indicate write commands are not allowed, got: {}",
err
);
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_read_only_mode_allows_read_commands() {
let servers = create_primary_mock_with_replicas(0);
let mock = &servers[0];
let mut get_cmd = ferriskey::cmd("GET");
get_cmd.arg("foo");
mock.add_response(&get_cmd, "$3\r\nbar\r\n".to_string());
let addresses = get_mock_addresses(&servers);
let mut connection_request =
create_connection_request(addresses.as_slice(), &Default::default());
connection_request.read_only = true;
block_on_all(async {
let mut client =
create_test_standalone_client(connection_request, None)
.await
.unwrap();
let result = client.send_command(&get_cmd).await;
assert!(
result.is_ok(),
"Read command should be allowed in read-only mode, got error: {:?}",
result.err()
);
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_read_only_mode_rejects_az_affinity() {
let mock = create_replica_only_mock();
let addresses = get_mock_addresses(&[mock]);
let mut connection_request =
create_connection_request(addresses.as_slice(), &Default::default());
connection_request.read_only = true;
connection_request.read_from = Some(ReadFrom::AZAffinity("us-east-1a".to_string()));
block_on_all(async {
let result =
create_test_standalone_client(connection_request, None).await;
assert!(
result.is_err(),
"AZAffinity should be rejected with read_only mode"
);
let err = format!("{:?}", result.unwrap_err());
assert!(
err.contains("read-only mode is not compatible with AZAffinity"),
"Error message should indicate AZAffinity incompatibility, got: {}",
err
);
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_read_only_mode_rejects_az_affinity_replicas_and_primary() {
let mock = create_replica_only_mock();
let addresses = get_mock_addresses(&[mock]);
let mut connection_request =
create_connection_request(addresses.as_slice(), &Default::default());
connection_request.read_only = true;
connection_request.read_from = Some(ReadFrom::AZAffinityReplicasAndPrimary("us-east-1a".to_string()));
block_on_all(async {
let result =
create_test_standalone_client(connection_request, None).await;
assert!(
result.is_err(),
"AZAffinityReplicasAndPrimary should be rejected with read_only mode"
);
let err = format!("{:?}", result.unwrap_err());
assert!(
err.contains("read-only mode is not compatible with AZAffinity"),
"Error message should indicate AZAffinity incompatibility, got: {}",
err
);
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_read_only_mode_accepts_prefer_replica() {
let mock = create_replica_only_mock();
let addresses = get_mock_addresses(&[mock]);
let mut connection_request =
create_connection_request(addresses.as_slice(), &Default::default());
connection_request.read_only = true;
connection_request.read_from = ReadFrom::PreferReplica.into();
block_on_all(async {
let result =
create_test_standalone_client(connection_request, None).await;
assert!(
result.is_ok(),
"PreferReplica should be accepted with read_only mode"
);
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_read_only_mode_accepts_primary_read_from() {
let mock = create_replica_only_mock();
let addresses = get_mock_addresses(&[mock]);
let mut connection_request =
create_connection_request(addresses.as_slice(), &Default::default());
connection_request.read_only = true;
connection_request.read_from = ReadFrom::Primary.into();
block_on_all(async {
let result =
create_test_standalone_client(connection_request, None).await;
assert!(
result.is_ok(),
"Primary ReadFrom should be accepted with read_only mode (reads go to connected nodes)"
);
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_read_only_mode_skips_info_replication() {
let mock = create_replica_only_mock();
let addresses = get_mock_addresses(&[mock]);
let mut connection_request =
create_connection_request(addresses.as_slice(), &Default::default());
connection_request.read_only = true;
block_on_all(async {
let _client =
create_test_standalone_client(connection_request, None)
.await
.unwrap();
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_normal_mode_requires_primary() {
let mock = create_replica_only_mock();
let addresses = get_mock_addresses(&[mock]);
let connection_request =
create_connection_request(addresses.as_slice(), &Default::default());
block_on_all(async {
let result =
create_test_standalone_client(connection_request, None).await;
assert!(
result.is_err(),
"Normal mode should fail without a primary node"
);
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_read_only_mode_primary_writes_replica_reads() {
let servers = create_primary_mock_with_replicas(1);
let primary_mock = &servers[0];
let replica_mock = &servers[1];
let mut set_cmd = ferriskey::cmd("SET");
set_cmd.arg("test_key").arg("test_value");
primary_mock.add_response(&set_cmd, "+OK\r\n".to_string());
let mut get_cmd = ferriskey::cmd("GET");
get_cmd.arg("test_key");
replica_mock.add_response(&get_cmd, "$10\r\ntest_value\r\n".to_string());
let addresses = get_mock_addresses(&servers);
let primary_address = vec![addresses[0].clone()];
let replica_address = vec![addresses[1].clone()];
block_on_all(async {
let primary_connection_request =
create_connection_request(primary_address.as_slice(), &Default::default());
let mut primary_client =
create_test_standalone_client(primary_connection_request, None)
.await
.expect("Primary client should connect successfully");
let mut replica_connection_request =
create_connection_request(replica_address.as_slice(), &Default::default());
replica_connection_request.read_only = true;
let mut replica_client =
create_test_standalone_client(replica_connection_request, None)
.await
.expect("Read-only replica client should connect successfully");
let write_result = primary_client.send_command(&set_cmd).await;
assert!(
write_result.is_ok(),
"Write to primary should succeed, got error: {:?}",
write_result.err()
);
let read_result = replica_client.send_command(&get_cmd).await;
assert!(
read_result.is_ok(),
"Read from replica should succeed in read-only mode, got error: {:?}",
read_result.err()
);
let value = read_result.unwrap();
assert_eq!(
value,
Value::BulkString(b"test_value".to_vec().into()),
"Read value should match written value"
);
let blocked_write_result = replica_client.send_command(&set_cmd).await;
assert!(
blocked_write_result.is_err(),
"Write command should be blocked on read-only client"
);
let err = blocked_write_result.unwrap_err();
assert!(
err.to_string()
.contains("write commands are not allowed in read-only mode"),
"Error message should indicate write commands are not allowed, got: {}",
err
);
});
}
}