redis_server_wrapper/chaos.rs
1//! Fault injection primitives for testing Redis client resilience.
2//!
3//! This module provides operations for simulating failures in Redis
4//! topologies: killing nodes, freezing processes (SIGSTOP/SIGCONT),
5//! triggering failovers, and more. All operations work with the handle
6//! types returned by the server, cluster, and sentinel builders.
7//!
8//! # Example
9//!
10//! ```no_run
11//! use redis_server_wrapper::{RedisCluster, chaos};
12//! use std::time::Duration;
13//!
14//! # async fn example() {
15//! let cluster = RedisCluster::builder()
16//! .masters(3)
17//! .replicas_per_master(1)
18//! .base_port(7100)
19//! .start()
20//! .await
21//! .unwrap();
22//!
23//! // Freeze a node (SIGSTOP) -- it stops processing but stays in memory.
24//! chaos::freeze_node(cluster.node(0));
25//!
26//! // ... test client behavior with a frozen node ...
27//!
28//! // Resume the node (SIGCONT).
29//! chaos::resume_node(cluster.node(0));
30//! # }
31//! ```
32
33#[cfg(feature = "tokio")]
34use crate::cluster::RedisClusterHandle;
35#[cfg(feature = "tokio")]
36use crate::error::Result;
37#[cfg(feature = "tokio")]
38use crate::server::RedisServerHandle;
39
40use std::process::Command;
41
42// ---------------------------------------------------------------------------
43// Node-level operations
44// ---------------------------------------------------------------------------
45
46/// Kill a node immediately with SIGKILL.
47///
48/// The process is terminated without any chance to clean up. This simulates
49/// a hard crash (e.g., OOM kill, hardware failure).
50#[cfg(feature = "tokio")]
51pub fn kill_node(handle: &RedisServerHandle) {
52 let pid = handle.pid().to_string();
53 let _ = Command::new("kill").args(["-9", &pid]).output();
54}
55
56/// Freeze a node by sending SIGSTOP.
57///
58/// The process is suspended -- it stops processing commands and won't
59/// respond to PING, but stays in memory. Clients will see timeouts.
60/// Use [`resume_node`] to unfreeze it.
61///
62/// This is more useful than [`kill_node`] for testing timeout handling and
63/// partition scenarios because the node can be resumed without losing state.
64#[cfg(feature = "tokio")]
65pub fn freeze_node(handle: &RedisServerHandle) {
66 let pid = handle.pid().to_string();
67 let _ = Command::new("kill").args(["-STOP", &pid]).output();
68}
69
70/// Resume a frozen node by sending SIGCONT.
71///
72/// The process resumes from where it was suspended. Buffered writes and
73/// replication will catch up automatically.
74#[cfg(feature = "tokio")]
75pub fn resume_node(handle: &RedisServerHandle) {
76 let pid = handle.pid().to_string();
77 let _ = Command::new("kill").args(["-CONT", &pid]).output();
78}
79
80/// Pause client connections for a duration using `CLIENT PAUSE`.
81///
82/// Unlike [`freeze_node`], the server process stays responsive for
83/// replication and cluster protocol. Only client commands are delayed.
84/// After the duration expires, clients resume automatically.
85#[cfg(feature = "tokio")]
86pub async fn slow_down(handle: &RedisServerHandle, millis: u64) -> Result<String> {
87 handle.run(&["CLIENT", "PAUSE", &millis.to_string()]).await
88}
89
90/// Trigger a background RDB save.
91#[cfg(feature = "tokio")]
92pub async fn trigger_save(handle: &RedisServerHandle) -> Result<String> {
93 handle.run(&["BGSAVE"]).await
94}
95
96/// Flush all data from a node.
97#[cfg(feature = "tokio")]
98pub async fn flushall(handle: &RedisServerHandle) -> Result<String> {
99 handle.run(&["FLUSHALL"]).await
100}
101
102// ---------------------------------------------------------------------------
103// Cluster-level operations
104// ---------------------------------------------------------------------------
105
106/// Kill the master node that owns a given hash slot.
107///
108/// Queries `CLUSTER SLOTS` on the seed node to find which node owns the
109/// slot, then sends SIGKILL to that process.
110///
111/// Returns `Ok(port)` of the killed node, or an error if the slot owner
112/// could not be determined.
113#[cfg(feature = "tokio")]
114pub async fn kill_master_by_slot(cluster: &RedisClusterHandle, slot: u16) -> Result<u16> {
115 let owner = find_slot_owner(cluster, slot).await?;
116 let pid = owner.pid().to_string();
117 let _ = Command::new("kill").args(["-9", &pid]).output();
118 Ok(owner.port())
119}
120
121/// Kill the master node that owns the hash slot for a given key.
122///
123/// Computes the slot via `CLUSTER KEYSLOT` then delegates to
124/// [`kill_master_by_slot`].
125#[cfg(feature = "tokio")]
126pub async fn kill_master_by_key(cluster: &RedisClusterHandle, key: &str) -> Result<u16> {
127 let slot = keyslot(cluster, key).await?;
128 kill_master_by_slot(cluster, slot).await
129}
130
131/// Freeze the master node that owns a given hash slot.
132///
133/// Like [`kill_master_by_slot`] but sends SIGSTOP instead of SIGKILL.
134/// The node can be recovered with [`recover`].
135#[cfg(feature = "tokio")]
136pub async fn freeze_master_by_slot(cluster: &RedisClusterHandle, slot: u16) -> Result<u16> {
137 let owner = find_slot_owner(cluster, slot).await?;
138 let pid = owner.pid().to_string();
139 let _ = Command::new("kill").args(["-STOP", &pid]).output();
140 Ok(owner.port())
141}
142
143/// Trigger a `CLUSTER FAILOVER` on a replica node.
144///
145/// If the initial failover fails because the master is down, retries with
146/// `CLUSTER FAILOVER FORCE`.
147#[cfg(feature = "tokio")]
148pub async fn trigger_failover(replica: &RedisServerHandle) -> Result<String> {
149 let result = replica.run(&["CLUSTER", "FAILOVER"]).await?;
150 if result.contains("ERR") {
151 return replica.run(&["CLUSTER", "FAILOVER", "FORCE"]).await;
152 }
153 Ok(result)
154}
155
156/// Resume all nodes in a cluster by sending SIGCONT.
157///
158/// Useful after freezing nodes for partition simulation. Sends SIGCONT to
159/// every node regardless of whether it was frozen.
160#[cfg(feature = "tokio")]
161pub fn recover(cluster: &RedisClusterHandle) {
162 for node in cluster.nodes() {
163 let pid = node.pid().to_string();
164 let _ = Command::new("kill").args(["-CONT", &pid]).output();
165 }
166}
167
168// ---------------------------------------------------------------------------
169// Helpers
170// ---------------------------------------------------------------------------
171
172/// Compute the hash slot for a key via `CLUSTER KEYSLOT`.
173#[cfg(feature = "tokio")]
174async fn keyslot(cluster: &RedisClusterHandle, key: &str) -> Result<u16> {
175 let output = cluster.cli().run(&["CLUSTER", "KEYSLOT", key]).await?;
176 let slot: u16 = output
177 .trim()
178 .parse()
179 .map_err(|_| crate::error::Error::Timeout {
180 message: format!("could not parse CLUSTER KEYSLOT response: {output}"),
181 })?;
182 Ok(slot)
183}
184
185/// Find the cluster node that owns a given slot by querying CLUSTER SLOTS.
186///
187/// CLUSTER SLOTS returns ranges like:
188/// ```text
189/// 1) 1) (integer) 0
190/// 2) (integer) 5460
191/// 3) 1) "127.0.0.1"
192/// 2) (integer) 7000
193/// 3) "node-id..."
194/// ```
195///
196/// We parse the port from each range and match it to our node handles.
197#[cfg(feature = "tokio")]
198async fn find_slot_owner(cluster: &RedisClusterHandle, slot: u16) -> Result<&RedisServerHandle> {
199 // Use CLUSTER NODES which gives a simpler text format to parse.
200 // Each line: <id> <ip:port@bus> <flags> <master> <ping> <pong> <epoch> <link> <slot-ranges...>
201 for node in cluster.nodes() {
202 if let Ok(info) = node.run(&["CLUSTER", "NODES"]).await {
203 for line in info.lines() {
204 let parts: Vec<&str> = line.split_whitespace().collect();
205 if parts.len() < 9 {
206 continue;
207 }
208 let flags = parts[2];
209 if !flags.contains("master") {
210 continue;
211 }
212 // Check slot ranges (parts[8..])
213 for range_str in &parts[8..] {
214 if slot_in_range(range_str, slot) {
215 // Extract port from ip:port@bus
216 if let Some(port) = parse_cluster_node_port(parts[1]) {
217 // Find the matching handle
218 for n in cluster.nodes() {
219 if n.port() == port {
220 return Ok(n);
221 }
222 }
223 }
224 }
225 }
226 }
227 // Only need to query one node.
228 break;
229 }
230 }
231 Err(crate::error::Error::Timeout {
232 message: format!("could not find owner of slot {slot}"),
233 })
234}
235
236/// Check if a slot falls within a range string like "0-5460" or "5461".
237fn slot_in_range(range_str: &str, slot: u16) -> bool {
238 // Skip import/migration markers like [123->-node] or [123-<-node]
239 if range_str.starts_with('[') {
240 return false;
241 }
242 if let Some((start, end)) = range_str.split_once('-') {
243 let Ok(start) = start.parse::<u16>() else {
244 return false;
245 };
246 let Ok(end) = end.parse::<u16>() else {
247 return false;
248 };
249 slot >= start && slot <= end
250 } else {
251 range_str.parse::<u16>().ok() == Some(slot)
252 }
253}
254
255/// Parse port from "ip:port@busport" format.
256fn parse_cluster_node_port(addr: &str) -> Option<u16> {
257 let host_port = addr.split('@').next()?;
258 let port_str = host_port.rsplit(':').next()?;
259 port_str.parse().ok()
260}
261
262#[cfg(test)]
263mod tests {
264 use super::*;
265
266 #[test]
267 fn slot_in_range_single() {
268 assert!(slot_in_range("5461", 5461));
269 assert!(!slot_in_range("5461", 5462));
270 }
271
272 #[test]
273 fn slot_in_range_range() {
274 assert!(slot_in_range("0-5460", 0));
275 assert!(slot_in_range("0-5460", 5460));
276 assert!(slot_in_range("0-5460", 1000));
277 assert!(!slot_in_range("0-5460", 5461));
278 }
279
280 #[test]
281 fn slot_in_range_import_marker() {
282 assert!(!slot_in_range("[123->-abc]", 123));
283 assert!(!slot_in_range("[123-<-abc]", 123));
284 }
285
286 #[test]
287 fn parse_port_from_cluster_nodes() {
288 assert_eq!(parse_cluster_node_port("127.0.0.1:7000@17000"), Some(7000));
289 assert_eq!(parse_cluster_node_port("127.0.0.1:7001@17001"), Some(7001));
290 assert_eq!(parse_cluster_node_port("garbage"), None);
291 }
292}