#[cfg(feature = "tokio")]
use crate::cluster::RedisClusterHandle;
#[cfg(feature = "tokio")]
use crate::error::Result;
#[cfg(feature = "tokio")]
use crate::server::RedisServerHandle;
use std::process::Command;
#[cfg(feature = "tokio")]
pub fn kill_node(handle: &RedisServerHandle) {
let pid = handle.pid().to_string();
let _ = Command::new("kill").args(["-9", &pid]).output();
}
#[cfg(feature = "tokio")]
pub fn freeze_node(handle: &RedisServerHandle) {
let pid = handle.pid().to_string();
let _ = Command::new("kill").args(["-STOP", &pid]).output();
}
#[cfg(feature = "tokio")]
pub fn resume_node(handle: &RedisServerHandle) {
let pid = handle.pid().to_string();
let _ = Command::new("kill").args(["-CONT", &pid]).output();
}
#[cfg(feature = "tokio")]
pub async fn slow_down(handle: &RedisServerHandle, millis: u64) -> Result<String> {
handle.run(&["CLIENT", "PAUSE", &millis.to_string()]).await
}
#[cfg(feature = "tokio")]
pub async fn trigger_save(handle: &RedisServerHandle) -> Result<String> {
handle.run(&["BGSAVE"]).await
}
#[cfg(feature = "tokio")]
pub async fn flushall(handle: &RedisServerHandle) -> Result<String> {
handle.run(&["FLUSHALL"]).await
}
#[cfg(feature = "tokio")]
pub async fn kill_master_by_slot(cluster: &RedisClusterHandle, slot: u16) -> Result<u16> {
let owner = find_slot_owner(cluster, slot).await?;
let pid = owner.pid().to_string();
let _ = Command::new("kill").args(["-9", &pid]).output();
Ok(owner.port())
}
#[cfg(feature = "tokio")]
pub async fn kill_master_by_key(cluster: &RedisClusterHandle, key: &str) -> Result<u16> {
let slot = keyslot(cluster, key).await?;
kill_master_by_slot(cluster, slot).await
}
#[cfg(feature = "tokio")]
pub async fn freeze_master_by_slot(cluster: &RedisClusterHandle, slot: u16) -> Result<u16> {
let owner = find_slot_owner(cluster, slot).await?;
let pid = owner.pid().to_string();
let _ = Command::new("kill").args(["-STOP", &pid]).output();
Ok(owner.port())
}
#[cfg(feature = "tokio")]
pub async fn trigger_failover(replica: &RedisServerHandle) -> Result<String> {
let result = replica.run(&["CLUSTER", "FAILOVER"]).await?;
if result.contains("ERR") {
return replica.run(&["CLUSTER", "FAILOVER", "FORCE"]).await;
}
Ok(result)
}
#[cfg(feature = "tokio")]
pub fn recover(cluster: &RedisClusterHandle) {
for node in cluster.nodes() {
let pid = node.pid().to_string();
let _ = Command::new("kill").args(["-CONT", &pid]).output();
}
}
#[cfg(feature = "tokio")]
async fn keyslot(cluster: &RedisClusterHandle, key: &str) -> Result<u16> {
let output = cluster.cli().run(&["CLUSTER", "KEYSLOT", key]).await?;
let slot: u16 = output
.trim()
.parse()
.map_err(|_| crate::error::Error::Timeout {
message: format!("could not parse CLUSTER KEYSLOT response: {output}"),
})?;
Ok(slot)
}
#[cfg(feature = "tokio")]
async fn find_slot_owner(cluster: &RedisClusterHandle, slot: u16) -> Result<&RedisServerHandle> {
for node in cluster.nodes() {
if let Ok(info) = node.run(&["CLUSTER", "NODES"]).await {
for line in info.lines() {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() < 9 {
continue;
}
let flags = parts[2];
if !flags.contains("master") {
continue;
}
for range_str in &parts[8..] {
if slot_in_range(range_str, slot) {
if let Some(port) = parse_cluster_node_port(parts[1]) {
for n in cluster.nodes() {
if n.port() == port {
return Ok(n);
}
}
}
}
}
}
break;
}
}
Err(crate::error::Error::Timeout {
message: format!("could not find owner of slot {slot}"),
})
}
fn slot_in_range(range_str: &str, slot: u16) -> bool {
if range_str.starts_with('[') {
return false;
}
if let Some((start, end)) = range_str.split_once('-') {
let Ok(start) = start.parse::<u16>() else {
return false;
};
let Ok(end) = end.parse::<u16>() else {
return false;
};
slot >= start && slot <= end
} else {
range_str.parse::<u16>().ok() == Some(slot)
}
}
fn parse_cluster_node_port(addr: &str) -> Option<u16> {
let host_port = addr.split('@').next()?;
let port_str = host_port.rsplit(':').next()?;
port_str.parse().ok()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn slot_in_range_single() {
assert!(slot_in_range("5461", 5461));
assert!(!slot_in_range("5461", 5462));
}
#[test]
fn slot_in_range_range() {
assert!(slot_in_range("0-5460", 0));
assert!(slot_in_range("0-5460", 5460));
assert!(slot_in_range("0-5460", 1000));
assert!(!slot_in_range("0-5460", 5461));
}
#[test]
fn slot_in_range_import_marker() {
assert!(!slot_in_range("[123->-abc]", 123));
assert!(!slot_in_range("[123-<-abc]", 123));
}
#[test]
fn parse_port_from_cluster_nodes() {
assert_eq!(parse_cluster_node_port("127.0.0.1:7000@17000"), Some(7000));
assert_eq!(parse_cluster_node_port("127.0.0.1:7001@17001"), Some(7001));
assert_eq!(parse_cluster_node_port("garbage"), None);
}
}