Skip to main content

redis_server_wrapper/
chaos.rs

1//! Fault injection primitives for testing Redis client resilience.
2//!
3//! This module provides operations for simulating failures in Redis
4//! topologies: killing nodes, freezing processes (SIGSTOP/SIGCONT),
5//! triggering failovers, and more. All operations work with the handle
6//! types returned by the server, cluster, and sentinel builders.
7//!
8//! # Example
9//!
10//! ```no_run
11//! use redis_server_wrapper::{RedisCluster, chaos};
12//! use std::time::Duration;
13//!
14//! # async fn example() {
15//! let cluster = RedisCluster::builder()
16//!     .masters(3)
17//!     .replicas_per_master(1)
18//!     .base_port(7100)
19//!     .start()
20//!     .await
21//!     .unwrap();
22//!
23//! // Freeze a node (SIGSTOP) -- it stops processing but stays in memory.
24//! chaos::freeze_node(cluster.node(0));
25//!
26//! // ... test client behavior with a frozen node ...
27//!
28//! // Resume the node (SIGCONT).
29//! chaos::resume_node(cluster.node(0));
30//! # }
31//! ```
32
33#[cfg(feature = "tokio")]
34use crate::cluster::RedisClusterHandle;
35#[cfg(feature = "tokio")]
36use crate::error::Result;
37#[cfg(feature = "tokio")]
38use crate::server::RedisServerHandle;
39
40use std::process::Command;
41
42// ---------------------------------------------------------------------------
43// Node-level operations
44// ---------------------------------------------------------------------------
45
46/// Kill a node immediately with SIGKILL.
47///
48/// The process is terminated without any chance to clean up. This simulates
49/// a hard crash (e.g., OOM kill, hardware failure).
50#[cfg(feature = "tokio")]
51pub fn kill_node(handle: &RedisServerHandle) {
52    let pid = handle.pid().to_string();
53    let _ = Command::new("kill").args(["-9", &pid]).output();
54}
55
56/// Freeze a node by sending SIGSTOP.
57///
58/// The process is suspended -- it stops processing commands and won't
59/// respond to PING, but stays in memory. Clients will see timeouts.
60/// Use [`resume_node`] to unfreeze it.
61///
62/// This is more useful than [`kill_node`] for testing timeout handling and
63/// partition scenarios because the node can be resumed without losing state.
64#[cfg(feature = "tokio")]
65pub fn freeze_node(handle: &RedisServerHandle) {
66    let pid = handle.pid().to_string();
67    let _ = Command::new("kill").args(["-STOP", &pid]).output();
68}
69
70/// Resume a frozen node by sending SIGCONT.
71///
72/// The process resumes from where it was suspended. Buffered writes and
73/// replication will catch up automatically.
74#[cfg(feature = "tokio")]
75pub fn resume_node(handle: &RedisServerHandle) {
76    let pid = handle.pid().to_string();
77    let _ = Command::new("kill").args(["-CONT", &pid]).output();
78}
79
80/// Pause client connections for a duration using `CLIENT PAUSE`.
81///
82/// Unlike [`freeze_node`], the server process stays responsive for
83/// replication and cluster protocol. Only client commands are delayed.
84/// After the duration expires, clients resume automatically.
85#[cfg(feature = "tokio")]
86pub async fn slow_down(handle: &RedisServerHandle, millis: u64) -> Result<String> {
87    handle.run(&["CLIENT", "PAUSE", &millis.to_string()]).await
88}
89
90/// Trigger a background RDB save.
91#[cfg(feature = "tokio")]
92pub async fn trigger_save(handle: &RedisServerHandle) -> Result<String> {
93    handle.run(&["BGSAVE"]).await
94}
95
96/// Flush all data from a node.
97#[cfg(feature = "tokio")]
98pub async fn flushall(handle: &RedisServerHandle) -> Result<String> {
99    handle.run(&["FLUSHALL"]).await
100}
101
102// ---------------------------------------------------------------------------
103// Cluster-level operations
104// ---------------------------------------------------------------------------
105
106/// Kill the master node that owns a given hash slot.
107///
108/// Queries `CLUSTER SLOTS` on the seed node to find which node owns the
109/// slot, then sends SIGKILL to that process.
110///
111/// Returns `Ok(port)` of the killed node, or an error if the slot owner
112/// could not be determined.
113#[cfg(feature = "tokio")]
114pub async fn kill_master_by_slot(cluster: &RedisClusterHandle, slot: u16) -> Result<u16> {
115    let owner = find_slot_owner(cluster, slot).await?;
116    let pid = owner.pid().to_string();
117    let _ = Command::new("kill").args(["-9", &pid]).output();
118    Ok(owner.port())
119}
120
121/// Kill the master node that owns the hash slot for a given key.
122///
123/// Computes the slot via `CLUSTER KEYSLOT` then delegates to
124/// [`kill_master_by_slot`].
125#[cfg(feature = "tokio")]
126pub async fn kill_master_by_key(cluster: &RedisClusterHandle, key: &str) -> Result<u16> {
127    let slot = keyslot(cluster, key).await?;
128    kill_master_by_slot(cluster, slot).await
129}
130
131/// Freeze the master node that owns a given hash slot.
132///
133/// Like [`kill_master_by_slot`] but sends SIGSTOP instead of SIGKILL.
134/// The node can be recovered with [`recover`].
135#[cfg(feature = "tokio")]
136pub async fn freeze_master_by_slot(cluster: &RedisClusterHandle, slot: u16) -> Result<u16> {
137    let owner = find_slot_owner(cluster, slot).await?;
138    let pid = owner.pid().to_string();
139    let _ = Command::new("kill").args(["-STOP", &pid]).output();
140    Ok(owner.port())
141}
142
143/// Trigger a `CLUSTER FAILOVER` on a replica node.
144///
145/// If the initial failover fails because the master is down, retries with
146/// `CLUSTER FAILOVER FORCE`.
147#[cfg(feature = "tokio")]
148pub async fn trigger_failover(replica: &RedisServerHandle) -> Result<String> {
149    let result = replica.run(&["CLUSTER", "FAILOVER"]).await?;
150    if result.contains("ERR") {
151        return replica.run(&["CLUSTER", "FAILOVER", "FORCE"]).await;
152    }
153    Ok(result)
154}
155
156/// Resume all nodes in a cluster by sending SIGCONT.
157///
158/// Useful after freezing nodes for partition simulation. Sends SIGCONT to
159/// every node regardless of whether it was frozen.
160#[cfg(feature = "tokio")]
161pub fn recover(cluster: &RedisClusterHandle) {
162    for node in cluster.nodes() {
163        let pid = node.pid().to_string();
164        let _ = Command::new("kill").args(["-CONT", &pid]).output();
165    }
166}
167
168// ---------------------------------------------------------------------------
169// Helpers
170// ---------------------------------------------------------------------------
171
172/// Compute the hash slot for a key via `CLUSTER KEYSLOT`.
173#[cfg(feature = "tokio")]
174async fn keyslot(cluster: &RedisClusterHandle, key: &str) -> Result<u16> {
175    let output = cluster.cli().run(&["CLUSTER", "KEYSLOT", key]).await?;
176    let slot: u16 = output
177        .trim()
178        .parse()
179        .map_err(|_| crate::error::Error::Timeout {
180            message: format!("could not parse CLUSTER KEYSLOT response: {output}"),
181        })?;
182    Ok(slot)
183}
184
185/// Find the cluster node that owns a given slot by querying CLUSTER SLOTS.
186///
187/// CLUSTER SLOTS returns ranges like:
188/// ```text
189/// 1) 1) (integer) 0
190///    2) (integer) 5460
191///    3) 1) "127.0.0.1"
192///       2) (integer) 7000
193///       3) "node-id..."
194/// ```
195///
196/// We parse the port from each range and match it to our node handles.
197#[cfg(feature = "tokio")]
198async fn find_slot_owner(cluster: &RedisClusterHandle, slot: u16) -> Result<&RedisServerHandle> {
199    // Use CLUSTER NODES which gives a simpler text format to parse.
200    // Each line: <id> <ip:port@bus> <flags> <master> <ping> <pong> <epoch> <link> <slot-ranges...>
201    for node in cluster.nodes() {
202        if let Ok(info) = node.run(&["CLUSTER", "NODES"]).await {
203            for line in info.lines() {
204                let parts: Vec<&str> = line.split_whitespace().collect();
205                if parts.len() < 9 {
206                    continue;
207                }
208                let flags = parts[2];
209                if !flags.contains("master") {
210                    continue;
211                }
212                // Check slot ranges (parts[8..])
213                for range_str in &parts[8..] {
214                    if slot_in_range(range_str, slot) {
215                        // Extract port from ip:port@bus
216                        if let Some(port) = parse_cluster_node_port(parts[1]) {
217                            // Find the matching handle
218                            for n in cluster.nodes() {
219                                if n.port() == port {
220                                    return Ok(n);
221                                }
222                            }
223                        }
224                    }
225                }
226            }
227            // Only need to query one node.
228            break;
229        }
230    }
231    Err(crate::error::Error::Timeout {
232        message: format!("could not find owner of slot {slot}"),
233    })
234}
235
236/// Check if a slot falls within a range string like "0-5460" or "5461".
237fn slot_in_range(range_str: &str, slot: u16) -> bool {
238    // Skip import/migration markers like [123->-node] or [123-<-node]
239    if range_str.starts_with('[') {
240        return false;
241    }
242    if let Some((start, end)) = range_str.split_once('-') {
243        let Ok(start) = start.parse::<u16>() else {
244            return false;
245        };
246        let Ok(end) = end.parse::<u16>() else {
247            return false;
248        };
249        slot >= start && slot <= end
250    } else {
251        range_str.parse::<u16>().ok() == Some(slot)
252    }
253}
254
255/// Parse port from "ip:port@busport" format.
256fn parse_cluster_node_port(addr: &str) -> Option<u16> {
257    let host_port = addr.split('@').next()?;
258    let port_str = host_port.rsplit(':').next()?;
259    port_str.parse().ok()
260}
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265
266    #[test]
267    fn slot_in_range_single() {
268        assert!(slot_in_range("5461", 5461));
269        assert!(!slot_in_range("5461", 5462));
270    }
271
272    #[test]
273    fn slot_in_range_range() {
274        assert!(slot_in_range("0-5460", 0));
275        assert!(slot_in_range("0-5460", 5460));
276        assert!(slot_in_range("0-5460", 1000));
277        assert!(!slot_in_range("0-5460", 5461));
278    }
279
280    #[test]
281    fn slot_in_range_import_marker() {
282        assert!(!slot_in_range("[123->-abc]", 123));
283        assert!(!slot_in_range("[123-<-abc]", 123));
284    }
285
286    #[test]
287    fn parse_port_from_cluster_nodes() {
288        assert_eq!(parse_cluster_node_port("127.0.0.1:7000@17000"), Some(7000));
289        assert_eq!(parse_cluster_node_port("127.0.0.1:7001@17001"), Some(7001));
290        assert_eq!(parse_cluster_node_port("garbage"), None);
291    }
292}