mod constants;
mod utilities;
#[cfg(test)]
mod cluster_client_tests {
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use crate::constants::{IP_ADDRESS_V4, IP_ADDRESS_V6};
use crate::utilities::{
cluster::{
LONG_CLUSTER_TEST_TIMEOUT, ValkeyCluster, SHORT_CLUSTER_TEST_TIMEOUT,
setup_cluster_with_replicas, setup_test_basics_internal,
},
*,
};
use ferriskey::client::Client;
use ferriskey::client::ValkeyClientForTests;
use ferriskey::client::types::ReadFrom;
use ferriskey::{
InfoDict, ProtocolVersion, PubSubSubscriptionInfo, PubSubSubscriptionKind,
ValkeyConnectionInfo, Value,
};
use ferriskey::cluster::routing::{
MultipleNodeRoutingInfo, Route, RoutingInfo, SingleNodeRoutingInfo, SlotAddr,
};
use rstest::rstest;
use versions::Versioning;
fn count_primary_or_replica(value: &str) -> (u16, u16) {
if value.contains("role:master") {
(1, 0)
} else if value.contains("role:slave") {
(0, 1)
} else {
(0, 0)
}
}
fn count_primaries_and_replicas(info_replication: HashMap<String, String>) -> (u16, u16) {
info_replication
.into_iter()
.fold((0, 0), |acc, (_, value)| {
let count = count_primary_or_replica(&value);
(acc.0 + count.0, acc.1 + count.1)
})
}
#[rstest]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
fn test_send_routing_no_provided_route() {
block_on_all(async {
let mut test_basics = setup_test_basics_internal(TestConfiguration {
cluster_mode: ClusterMode::Enabled,
shared_server: true,
..Default::default()
})
.await;
let mut cmd = ferriskey::cmd("INFO");
cmd.arg("REPLICATION");
let info = test_basics
.client
.send_command(&mut cmd, None)
.await
.unwrap();
let info = ferriskey::from_owned_value::<HashMap<String, String>>(info).unwrap();
let (primaries, replicas) = count_primaries_and_replicas(info);
assert_eq!(primaries, 3);
assert_eq!(replicas, 0);
});
}
#[rstest]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
fn test_send_routing_to_all_primaries() {
block_on_all(async {
let mut test_basics = setup_test_basics_internal(TestConfiguration {
cluster_mode: ClusterMode::Enabled,
shared_server: true,
..Default::default()
})
.await;
let mut cmd = ferriskey::cmd("INFO");
cmd.arg("REPLICATION");
let info = test_basics
.client
.send_command(
&mut cmd,
Some(RoutingInfo::MultiNode((
MultipleNodeRoutingInfo::AllMasters,
None,
))),
)
.await
.unwrap();
let info = ferriskey::from_owned_value::<HashMap<String, String>>(info).unwrap();
let (primaries, replicas) = count_primaries_and_replicas(info);
assert_eq!(primaries, 3);
assert_eq!(replicas, 0);
});
}
#[rstest]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
fn test_send_routing_to_all_nodes() {
block_on_all(async {
let mut test_basics = setup_test_basics_internal(TestConfiguration {
cluster_mode: ClusterMode::Enabled,
shared_server: true,
..Default::default()
})
.await;
let mut cmd = ferriskey::cmd("INFO");
cmd.arg("REPLICATION");
let info = test_basics
.client
.send_command(
&mut cmd,
Some(RoutingInfo::MultiNode((
MultipleNodeRoutingInfo::AllNodes,
None,
))),
)
.await
.unwrap();
let info = ferriskey::from_owned_value::<HashMap<String, String>>(info).unwrap();
let (primaries, replicas) = count_primaries_and_replicas(info);
assert_eq!(primaries, 3);
assert_eq!(replicas, 3);
});
}
#[rstest]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
fn test_send_routing_by_slot_to_primary() {
block_on_all(async {
let mut test_basics = setup_test_basics_internal(TestConfiguration {
cluster_mode: ClusterMode::Enabled,
shared_server: true,
..Default::default()
})
.await;
let mut cmd = ferriskey::cmd("INFO");
cmd.arg("REPLICATION");
let info = test_basics
.client
.send_command(
&mut cmd,
Some(RoutingInfo::SingleNode(
SingleNodeRoutingInfo::SpecificNode(Route::new(0, SlotAddr::Master)),
)),
)
.await
.unwrap();
let info = ferriskey::from_owned_value::<String>(info).unwrap();
let (primaries, replicas) = count_primary_or_replica(&info);
assert_eq!(primaries, 1);
assert_eq!(replicas, 0);
});
}
#[rstest]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
fn test_send_routing_by_slot_to_replica_if_read_from_replica_configuration_allows() {
block_on_all(async {
let mut test_basics = setup_test_basics_internal(TestConfiguration {
cluster_mode: ClusterMode::Enabled,
shared_server: true,
read_from: Some(ReadFrom::PreferReplica),
..Default::default()
})
.await;
let mut cmd = ferriskey::cmd("INFO");
cmd.arg("REPLICATION");
let info = test_basics
.client
.send_command(
&mut cmd,
Some(RoutingInfo::SingleNode(
SingleNodeRoutingInfo::SpecificNode(Route::new(
0,
SlotAddr::ReplicaOptional,
)),
)),
)
.await
.unwrap();
let info = ferriskey::from_owned_value::<String>(info).unwrap();
let (primaries, replicas) = count_primary_or_replica(&info);
assert_eq!(primaries, 0);
assert_eq!(replicas, 1);
});
}
#[rstest]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
fn test_send_routing_by_slot_to_replica_override_read_from_replica_configuration() {
block_on_all(async {
let mut test_basics = setup_test_basics_internal(TestConfiguration {
cluster_mode: ClusterMode::Enabled,
shared_server: true,
read_from: Some(ReadFrom::Primary),
..Default::default()
})
.await;
let mut cmd = ferriskey::cmd("INFO");
cmd.arg("REPLICATION");
let info = test_basics
.client
.send_command(
&mut cmd,
Some(RoutingInfo::SingleNode(
SingleNodeRoutingInfo::SpecificNode(Route::new(
0,
SlotAddr::ReplicaRequired,
)),
)),
)
.await
.unwrap();
let info = ferriskey::from_owned_value::<String>(info).unwrap();
let (primaries, replicas) = count_primary_or_replica(&info);
assert_eq!(primaries, 0);
assert_eq!(replicas, 1);
});
}
#[rstest]
#[timeout(LONG_CLUSTER_TEST_TIMEOUT)]
fn test_fail_creation_with_unsupported_sharded_pubsub() {
block_on_all(async {
let mut test_basics = setup_cluster_with_replicas(
TestConfiguration {
cluster_mode: ClusterMode::Enabled,
shared_server: false,
..Default::default()
},
0,
3,
)
.await;
let mut cmd = ferriskey::cmd("INFO");
let info = test_basics
.client
.send_command(
&mut cmd,
Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)),
)
.await
.unwrap();
let info_dict: InfoDict = ferriskey::from_owned_value(info).unwrap();
match info_dict.get::<String>("redis_version") {
Some(version) => match (Versioning::new(version), Versioning::new("7.0")) {
(Some(server_ver), Some(min_ver)) => {
if server_ver < min_ver {
let cluster = test_basics.cluster.unwrap();
let mut addresses = cluster.get_server_addresses();
addresses.truncate(1);
let mut connection_request = ferriskey::client::types::ConnectionRequest {
addresses: addresses.iter().map(get_address_info).collect(),
cluster_mode_enabled: true,
..Default::default()
};
let channel = b"last-slot-channel-{16383}".to_vec();
let mut subs = PubSubSubscriptionInfo::new();
subs.insert(
PubSubSubscriptionKind::Exact,
std::collections::HashSet::from([channel.clone()]),
);
connection_request.pubsub_subscriptions = Some(subs.clone());
let _client = Client::new(connection_request.clone(), None)
.await
.unwrap();
subs.insert(
PubSubSubscriptionKind::Sharded,
std::collections::HashSet::from([channel]),
);
connection_request.pubsub_subscriptions = Some(subs);
let client = Client::new(connection_request, None).await;
assert!(client.is_err());
}
}
_ => {
panic!("Failed to parse engine version");
}
},
_ => {
panic!("Could not determine engine version from INFO result");
}
}
});
}
async fn get_total_clients_on_shared_cluster_primaries(client: &mut Client) -> usize {
let mut cmd = ferriskey::Cmd::new();
cmd.arg("CLIENT").arg("LIST");
let routing_info = RoutingInfo::MultiNode((MultipleNodeRoutingInfo::AllMasters, None));
let mut total_clients = 0;
tracing::info!(
"TestClusterLazyHelper - Querying CLIENT LIST on all shared cluster primaries via AllMasters routing."
);
match client.send_command(&mut cmd, Some(routing_info)).await {
Ok(Value::Map(node_results_map)) => {
for (_node_addr_value, node_result_value) in node_results_map {
match node_result_value {
Value::BulkString(bytes) => {
let s = String::from_utf8_lossy(&bytes);
total_clients += s.lines().count();
}
Value::VerbatimString { text, format: _ } => {
total_clients += text.lines().count();
}
_ => {
tracing::warn!(
"TestClusterLazyHelper - CLIENT LIST from a primary (AllMasters) returned unexpected inner type for a node's result: {node_result_value:?}"
);
}
}
}
}
Ok(other_type) => {
tracing::warn!(
"TestClusterLazyHelper - CLIENT LIST with AllMasters routing returned an unexpected type (expected Map): {other_type:?}"
);
}
Err(e) => {
tracing::warn!(
"TestClusterLazyHelper - CLIENT LIST with AllMasters routing failed: {e:?}"
);
}
}
tracing::info!(
"TestClusterLazyHelper - Total clients found on shared primaries (AllMasters): {total_clients}"
);
total_clients
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
fn test_set_database_id_after_reconnection() {
let mut client_info_cmd = ferriskey::cmd("CLIENT");
client_info_cmd.arg("INFO");
block_on_all(async {
let mut version_check_basics = setup_test_basics_internal(TestConfiguration {
cluster_mode: ClusterMode::Enabled,
shared_server: true,
..Default::default()
})
.await;
if !version_greater_or_equal(&mut version_check_basics.client, "9.0.0").await {
return;
}
let mut test_basics = setup_test_basics_internal(TestConfiguration {
cluster_mode: ClusterMode::Enabled,
shared_server: true,
database_id: 4, ..Default::default()
})
.await;
let client_info = test_basics
.client
.send_command(&mut client_info_cmd, None)
.await
.unwrap();
let client_info_str = match client_info {
ferriskey::Value::BulkString(bytes) => String::from_utf8_lossy(&bytes).to_string(),
ferriskey::Value::VerbatimString { text, .. } => text,
_ => panic!("Unexpected CLIENT INFO response type: {:?}", client_info),
};
assert!(client_info_str.contains("db=4"));
let initial_client_id =
extract_client_id(&client_info_str).expect("Failed to extract initial client ID");
kill_connection(&mut test_basics.client).await;
let res = test_basics
.client
.send_command(&mut client_info_cmd, None)
.await;
match res {
Err(err) => {
assert!(
err.is_connection_dropped()
|| err.is_timeout()
|| err.kind() == ferriskey::ErrorKind::AllConnectionsUnavailable,
"Expected connection dropped, timeout, or unavailable error, got: {err:?}",
);
let client_info = retry(|| async {
let mut client = test_basics.client.clone();
let mut cmd = client_info_cmd.clone();
let response = client.send_command(&mut cmd, None).await.ok()?;
match response {
ferriskey::Value::BulkString(bytes) => {
Some(String::from_utf8_lossy(&bytes).to_string())
}
ferriskey::Value::VerbatimString { text, .. } => Some(text),
_ => None,
}
})
.await;
assert!(client_info.contains("db=4"));
}
Ok(response) => {
let new_client_info = match response {
ferriskey::Value::BulkString(bytes) => {
String::from_utf8_lossy(&bytes).to_string()
}
ferriskey::Value::VerbatimString { text, .. } => text,
_ => panic!("Unexpected CLIENT INFO response type: {:?}", response),
};
let new_client_id = extract_client_id(&new_client_info)
.expect("Failed to extract new client ID");
assert_ne!(
initial_client_id, new_client_id,
"Client ID should change after reconnection if command succeeds"
);
assert!(new_client_info.contains("db=4"));
}
}
});
}
#[rstest]
#[serial_test::serial]
#[timeout(LONG_CLUSTER_TEST_TIMEOUT)]
fn test_lazy_cluster_connection_establishes_on_first_command(
#[values(ProtocolVersion::RESP2, ProtocolVersion::RESP3)]
protocol: ProtocolVersion,
) {
block_on_all(async move {
const USE_TLS: bool = false;
let base_config_for_dedicated_cluster = TestConfiguration {
use_tls: USE_TLS,
protocol,
shared_server: false, cluster_mode: ClusterMode::Enabled,
lazy_connect: false, client_name: Some("base_config".to_string()),
..Default::default()
};
let mut monitoring_client_config = base_config_for_dedicated_cluster.clone();
monitoring_client_config.client_name = Some("monitoring_client".to_string());
let monitoring_test_basics = setup_test_basics_internal(monitoring_client_config).await;
let mut monitoring_client = monitoring_test_basics.client;
let dedicated_cluster_addresses = monitoring_test_basics
.cluster
.as_ref()
.expect("Dedicated cluster (Cluster A) should have been created")
.get_server_addresses();
let clients_before_lazy_init =
get_total_clients_on_shared_cluster_primaries(&mut monitoring_client).await;
tracing::info!(
"TestClusterLazy - Clients before lazy client init (protocol={protocol:?} on dedicated cluster A): {clients_before_lazy_init}"
);
let mut lazy_client_config = base_config_for_dedicated_cluster;
lazy_client_config.lazy_connect = true;
lazy_client_config.client_name = Some("lazy_config".to_string());
let lazy_connection_request = create_connection_request(
&dedicated_cluster_addresses,
&lazy_client_config, );
let mut lazy_ferriskey_client =
ferriskey::LazyClient::from_config(lazy_connection_request)
.expect("Failed to construct lazy client for Cluster A");
let clients_after_lazy_init =
get_total_clients_on_shared_cluster_primaries(&mut monitoring_client).await;
tracing::info!(
"TestClusterLazy - Clients after lazy client init (protocol={protocol:?} on dedicated cluster A): {clients_after_lazy_init}"
);
assert_eq!(
clients_after_lazy_init, clients_before_lazy_init,
"Lazy client (on dedicated cluster A) should not establish new connections before the first command. Before: {clients_before_lazy_init}, After: {clients_after_lazy_init}. protocol={protocol:?}"
);
tracing::info!(
"TestClusterLazy - Sending first command to lazy client (PING) (protocol={protocol:?} on dedicated cluster A)"
);
let ping_response = lazy_ferriskey_client
.send_command(&mut ferriskey::cmd("PING"), None)
.await;
assert!(
ping_response.is_ok(),
"PING command failed (on dedicated cluster A): {:?}. protocol={:?}",
ping_response.as_ref().err(),
protocol
);
assert_eq!(
ping_response.unwrap(),
ferriskey::Value::SimpleString("PONG".to_string())
);
let clients_after_first_command =
get_total_clients_on_shared_cluster_primaries(&mut monitoring_client).await;
tracing::info!(
"TestClusterLazy - Clients after first command (protocol={protocol:?} on dedicated cluster A): {clients_after_first_command}"
);
assert!(
clients_after_first_command > clients_before_lazy_init,
"Lazy client (on dedicated cluster A) should establish new connections after the first command. Before: {clients_before_lazy_init}, After: {clients_after_first_command}. protocol={protocol:?}"
);
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
fn test_cluster_tls_connection_with_custom_root_cert() {
block_on_all(async move {
let tempdir = tempfile::tempdir().expect("Failed to create temp dir");
let tls_paths = build_tls_file_paths(&tempdir);
let ca_cert_bytes = tls_paths.read_ca_cert_as_bytes();
let cluster = ValkeyCluster::new_with_tls(3, 0, Some(tls_paths));
let cluster_addresses = cluster.get_server_addresses();
let mut connection_request = create_connection_request(
&cluster_addresses,
&TestConfiguration {
use_tls: true,
shared_server: false,
cluster_mode: ClusterMode::Enabled,
..Default::default()
},
);
connection_request.tls_mode = Some(ferriskey::client::types::TlsMode::SecureTls);
connection_request.root_certs = vec![ca_cert_bytes];
let mut client = Client::new(connection_request, None)
.await
.expect("Failed to create cluster client with custom root cert");
assert_connected(&mut client).await;
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
fn test_cluster_tls_connection_fails_with_wrong_root_cert() {
block_on_all(async move {
let server_tempdir = tempfile::tempdir().expect("Failed to create temp dir");
let server_tls_paths = build_tls_file_paths(&server_tempdir);
let client_tempdir = tempfile::tempdir().expect("Failed to create temp dir");
let client_tls_paths = build_tls_file_paths(&client_tempdir);
let wrong_ca_cert_bytes = client_tls_paths.read_ca_cert_as_bytes();
let cluster = ValkeyCluster::new_with_tls(3, 0, Some(server_tls_paths));
let cluster_addresses = cluster.get_server_addresses();
let mut connection_request = create_connection_request(
&cluster_addresses,
&TestConfiguration {
use_tls: true,
shared_server: false,
cluster_mode: ClusterMode::Enabled,
..Default::default()
},
);
connection_request.tls_mode = Some(ferriskey::client::types::TlsMode::SecureTls);
connection_request.root_certs = vec![wrong_ca_cert_bytes];
let client_result = Client::new(connection_request, None).await;
assert!(
client_result.is_err(),
"Expected cluster connection to fail with wrong root certificate"
);
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
fn test_cluster_tls_connection_with_ip_address_succeeds(
#[values(IP_ADDRESS_V4, IP_ADDRESS_V6)] host: &str,
) {
block_on_all(async move {
let tempdir = tempfile::tempdir().expect("Failed to create temp dir");
let tls_paths = build_tls_file_paths(&tempdir);
let ca_cert_bytes = tls_paths.read_ca_cert_as_bytes();
let cluster = ValkeyCluster::new_with_tls(3, 0, Some(tls_paths));
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
let default_address = cluster.get_server_addresses()[0].clone();
let ip_addr = match default_address {
ferriskey::ConnectionAddr::TcpTls { port, .. } => ferriskey::ConnectionAddr::TcpTls {
host: host.to_string(),
port,
insecure: false,
tls_params: None,
},
_ => panic!("Expected TLS address"),
};
let mut connection_request = create_connection_request(
&[ip_addr],
&TestConfiguration {
use_tls: true,
cluster_mode: ClusterMode::Enabled,
shared_server: false,
..Default::default()
},
);
connection_request.tls_mode = Some(ferriskey::client::types::TlsMode::SecureTls);
connection_request.root_certs = vec![ca_cert_bytes];
let mut client = Client::new(connection_request, None)
.await
.expect("Failed to create cluster client with IP address");
assert_connected(&mut client).await;
});
}
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
fn test_cluster_connection_with_ip_address_succeeds(
#[values(IP_ADDRESS_V4, IP_ADDRESS_V6)] host: &str,
) {
block_on_all(async move {
let cluster = ValkeyCluster::new(false, &None, None, None);
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
let default_address = cluster.get_server_addresses()[0].clone();
let ip_addr = match default_address {
ferriskey::ConnectionAddr::Tcp(_, port) => {
ferriskey::ConnectionAddr::Tcp(host.to_string(), port)
}
_ => panic!("Expected TCP address"),
};
let connection_request = create_connection_request(
&[ip_addr],
&TestConfiguration {
cluster_mode: ClusterMode::Enabled,
shared_server: false,
..Default::default()
},
);
let mut client = Client::new(connection_request, None)
.await
.expect("Failed to create cluster client with IP address");
assert_connected(&mut client).await;
});
}
#[rstest]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
#[serial_test::serial]
fn test_cluster_connection_fails_with_permission_denied() {
block_on_all(async {
let test_basics = setup_test_basics_internal(TestConfiguration {
cluster_mode: ClusterMode::Enabled,
shared_server: false,
..Default::default()
})
.await;
let cluster = test_basics.cluster.as_ref().unwrap();
let mut client = test_basics.client;
let username = "restricted_user";
let password = "test_password_456";
let mut cmd = ferriskey::cmd("ACL");
cmd.arg("SETUSER")
.arg(username)
.arg("on")
.arg("allkeys")
.arg("+@all")
.arg("-cluster")
.arg(format!(">{password}"));
let routing = RoutingInfo::MultiNode((MultipleNodeRoutingInfo::AllNodes, None));
let set_user = client.send_command(&mut cmd, Some(routing)).await;
assert!(
set_user.is_ok(),
"Failed to set up ACL users for testing {:?}",
set_user.err()
);
let restricted_configuration = TestConfiguration {
cluster_mode: ClusterMode::Enabled,
connection_info: Some(ValkeyConnectionInfo {
username: Some(username.to_string()),
password: Some(password.to_string()),
..Default::default()
}),
..Default::default()
};
let addresses = cluster.get_server_addresses();
let mut connection_request =
create_connection_request(&addresses, &restricted_configuration);
connection_request.connection_timeout = Some(10_000);
let result = Client::new(connection_request, None).await;
assert!(
result.is_err(),
"Expected connection to fail with cluster slots permission"
);
if let Err(err) = result {
let error_msg = format!("{:?}", err);
assert!(
error_msg.contains("PermissionDenied"),
"Error should be a perrmission error, got: {}",
error_msg
);
}
});
}
#[cfg(unix)]
#[rstest]
#[timeout(LONG_CLUSTER_TEST_TIMEOUT)]
fn test_reconnect_to_initial_nodes_doesnt_block_throughput() {
block_on_all(async {
const CONNECTION_TIMEOUT_MS: u64 = 2000;
const WINDOW_MS: u64 = 3000;
const MIN_COMMANDS_WITH_FIX: u32 = 10;
let blackhole_listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("bind blackhole listener");
let blackhole_port = blackhole_listener.local_addr().unwrap().port();
let held_streams = Arc::new(std::sync::Mutex::new(Vec::<tokio::net::TcpStream>::new()));
let held_clone = held_streams.clone();
tokio::spawn(async move {
while let Ok((stream, _)) = blackhole_listener.accept().await {
held_clone.lock().unwrap().push(stream);
}
});
let cluster = ValkeyCluster::new(false, &None, Some(3), Some(0));
let cluster_addresses = cluster.get_server_addresses();
let pids = cluster.all_server_pids();
let mut initial_nodes: Vec<ferriskey::ConnectionInfo> = cluster_addresses
.iter()
.map(|addr| ferriskey::ConnectionInfo {
addr: addr.clone(),
valkey: ferriskey::ValkeyConnectionInfo::default(),
})
.collect();
initial_nodes.push(ferriskey::ConnectionInfo {
addr: ferriskey::ConnectionAddr::Tcp("127.0.0.1".to_string(), blackhole_port),
valkey: ferriskey::ValkeyConnectionInfo::default(),
});
let cluster_client = ferriskey::cluster::compat::ClusterClientBuilder::new(initial_nodes)
.periodic_connections_checks(Some(Duration::from_millis(100)))
.connection_timeout(Duration::from_millis(CONNECTION_TIMEOUT_MS))
.slots_refresh_rate_limit(Duration::from_millis(0), 0)
.build()
.expect("build cluster client");
let mut conn: ferriskey::cluster::ClusterConnection = cluster_client
.get_async_connection(None, None, None)
.await
.expect("connect to cluster");
let ping: ferriskey::Result<ferriskey::Value> = conn
.route_command(
&ferriskey::cmd("PING"),
ferriskey::cluster::routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random),
)
.await;
assert!(ping.is_ok(), "PING before kill failed: {:?}", ping);
for pid in &pids {
std::process::Command::new("kill")
.args(["-9", &pid.to_string()])
.output()
.ok();
}
tokio::time::sleep(Duration::from_millis(500)).await;
let mut completed: u32 = 0;
let mut cmd = ferriskey::cmd("GET");
cmd.arg("{test}key");
let window_start = std::time::Instant::now();
while window_start.elapsed() < Duration::from_millis(WINDOW_MS) {
let _ = tokio::time::timeout(
Duration::from_millis(CONNECTION_TIMEOUT_MS + 500),
conn.route_command(
&cmd,
ferriskey::cluster::routing::RoutingInfo::SingleNode(
SingleNodeRoutingInfo::SpecificNode(Route::new(0, SlotAddr::Master)),
),
),
)
.await;
completed += 1;
}
assert!(
completed >= MIN_COMMANDS_WITH_FIX,
"Send loop blocked: only {}/{} commands completed in {}ms. \
Each command serialized behind ready!(reconnect_to_initial_nodes) blocking for {}ms.",
completed,
MIN_COMMANDS_WITH_FIX,
WINDOW_MS,
CONNECTION_TIMEOUT_MS,
);
});
}
}