duroxide_pg_opt/fault_injection.rs
1//! Fault injection for testing resilience scenarios.
2//!
3//! This module provides a comprehensive fault injection mechanism to simulate
4//! failure conditions in tests without complex runtime manipulation.
5//!
6//! ## Supported Faults
7//!
8//! - `disable_notifier`: Prevents the notifier thread from starting
9//! - `refresh_delay`: Adds artificial delay to refresh queries
10//! - `force_reconnect`: Triggers a reconnection in the notifier
11//! - `refresh_should_error`: Makes the next refresh query fail
12//! - `notifier_should_panic`: Simulates a panic in the notifier thread
13
14use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
15use std::time::Duration;
16
17/// Fault injector for testing resilience scenarios.
18///
19/// Thread-safe structure that can be shared across provider and tests
20/// to inject faults dynamically during test execution.
21#[derive(Debug, Default)]
22pub struct FaultInjector {
23 /// If true, the notifier thread will not be spawned
24 notifier_disabled: AtomicBool,
25
26 /// Artificial delay (in milliseconds) to add to refresh queries
27 refresh_delay_ms: AtomicU64,
28
29 /// If true, forces the notifier to simulate a connection drop and reconnect
30 force_reconnect: AtomicBool,
31
32 /// If true, the next refresh query should return an error
33 refresh_should_error: AtomicBool,
34
35 /// If true, simulates a panic in the notifier thread
36 notifier_should_panic: AtomicBool,
37
38 /// Clock skew offset in milliseconds (can be positive or negative).
39 /// This value is added to all time calculations in the provider,
40 /// simulating a node whose clock is ahead (positive) or behind (negative).
41 clock_skew_ms: AtomicI64,
42}
43
44impl FaultInjector {
45 /// Create a new fault injector with no faults enabled.
46 pub fn new() -> Self {
47 Self::default()
48 }
49
50 // =========================================================================
51 // Notifier Control
52 // =========================================================================
53
54 /// Disable the notifier - prevents it from being spawned.
55 ///
56 /// When called before provider creation, the notifier thread will not start,
57 /// simulating a notifier failure scenario.
58 pub fn disable_notifier(&self) {
59 self.notifier_disabled.store(true, Ordering::SeqCst);
60 }
61
62 /// Check if the notifier is disabled.
63 pub fn is_notifier_disabled(&self) -> bool {
64 self.notifier_disabled.load(Ordering::SeqCst)
65 }
66
67 /// Set whether the notifier should panic on next iteration.
68 pub fn set_notifier_should_panic(&self, should_panic: bool) {
69 self.notifier_should_panic
70 .store(should_panic, Ordering::SeqCst);
71 }
72
73 /// Check if the notifier should panic.
74 pub fn should_notifier_panic(&self) -> bool {
75 self.notifier_should_panic.swap(false, Ordering::SeqCst)
76 }
77
78 // =========================================================================
79 // Refresh Query Control
80 // =========================================================================
81
82 /// Set artificial delay for refresh queries (simulates slow database).
83 pub fn set_refresh_delay(&self, delay: Duration) {
84 self.refresh_delay_ms
85 .store(delay.as_millis() as u64, Ordering::SeqCst);
86 }
87
88 /// Get the current refresh delay.
89 pub fn get_refresh_delay(&self) -> Duration {
90 Duration::from_millis(self.refresh_delay_ms.load(Ordering::SeqCst))
91 }
92
93 /// Set whether the next refresh query should return an error.
94 pub fn set_refresh_should_error(&self, should_error: bool) {
95 self.refresh_should_error
96 .store(should_error, Ordering::SeqCst);
97 }
98
99 /// Check and consume the refresh error flag.
100 pub fn should_refresh_error(&self) -> bool {
101 self.refresh_should_error.swap(false, Ordering::SeqCst)
102 }
103
104 // =========================================================================
105 // Connection Control
106 // =========================================================================
107
108 /// Force the notifier to reconnect (simulates connection drop).
109 pub fn trigger_reconnect(&self) {
110 self.force_reconnect.store(true, Ordering::SeqCst);
111 }
112
113 /// Check and consume the reconnect flag.
114 pub fn should_reconnect(&self) -> bool {
115 self.force_reconnect.swap(false, Ordering::SeqCst)
116 }
117
118 // =========================================================================
119 // Clock Skew Simulation
120 // =========================================================================
121
122 /// Set clock skew offset in milliseconds.
123 ///
124 /// Positive values simulate a clock that is ahead (future timestamps).
125 /// Negative values simulate a clock that is behind (past timestamps).
126 ///
127 /// This offset is added to all `now_millis()` calculations in the provider,
128 /// allowing simulation of clock drift between nodes.
129 ///
130 /// # Example
131 /// ```
132 /// use duroxide_pg_opt::FaultInjector;
133 /// use std::time::Duration;
134 ///
135 /// let fi = FaultInjector::new();
136 /// // Simulate clock 500ms ahead
137 /// fi.set_clock_skew(Duration::from_millis(500));
138 ///
139 /// // Simulate clock 200ms behind
140 /// fi.set_clock_skew_signed(-200);
141 /// ```
142 pub fn set_clock_skew(&self, skew: Duration) {
143 self.clock_skew_ms
144 .store(skew.as_millis() as i64, Ordering::SeqCst);
145 }
146
147 /// Set clock skew offset in milliseconds (signed).
148 ///
149 /// Positive values simulate a clock that is ahead.
150 /// Negative values simulate a clock that is behind.
151 pub fn set_clock_skew_signed(&self, skew_ms: i64) {
152 self.clock_skew_ms.store(skew_ms, Ordering::SeqCst);
153 }
154
155 /// Get the current clock skew offset in milliseconds.
156 pub fn get_clock_skew_ms(&self) -> i64 {
157 self.clock_skew_ms.load(Ordering::SeqCst)
158 }
159
160 /// Clear the clock skew (reset to 0).
161 pub fn clear_clock_skew(&self) {
162 self.clock_skew_ms.store(0, Ordering::SeqCst);
163 }
164}
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169
170 #[test]
171 fn test_fault_injector_default() {
172 let fi = FaultInjector::new();
173 assert!(!fi.is_notifier_disabled());
174 assert_eq!(fi.get_refresh_delay(), Duration::ZERO);
175 assert!(!fi.should_reconnect());
176 }
177
178 #[test]
179 fn test_disable_notifier() {
180 let fi = FaultInjector::new();
181 fi.disable_notifier();
182 assert!(fi.is_notifier_disabled());
183 }
184
185 #[test]
186 fn test_refresh_delay() {
187 let fi = FaultInjector::new();
188 fi.set_refresh_delay(Duration::from_secs(5));
189 assert_eq!(fi.get_refresh_delay(), Duration::from_secs(5));
190 }
191
192 #[test]
193 fn test_reconnect_flag() {
194 let fi = FaultInjector::new();
195 assert!(!fi.should_reconnect());
196 fi.trigger_reconnect();
197 assert!(fi.should_reconnect());
198 // Flag should be consumed
199 assert!(!fi.should_reconnect());
200 }
201
202 #[test]
203 fn test_refresh_error_flag() {
204 let fi = FaultInjector::new();
205 assert!(!fi.should_refresh_error());
206 fi.set_refresh_should_error(true);
207 assert!(fi.should_refresh_error());
208 // Flag should be consumed
209 assert!(!fi.should_refresh_error());
210 }
211
212 #[test]
213 fn test_notifier_panic_flag() {
214 let fi = FaultInjector::new();
215 assert!(!fi.should_notifier_panic());
216 fi.set_notifier_should_panic(true);
217 assert!(fi.should_notifier_panic());
218 // Flag should be consumed
219 assert!(!fi.should_notifier_panic());
220 }
221
222 #[test]
223 fn test_clock_skew() {
224 let fi = FaultInjector::new();
225 assert_eq!(fi.get_clock_skew_ms(), 0);
226
227 // Positive skew (clock ahead)
228 fi.set_clock_skew(Duration::from_millis(500));
229 assert_eq!(fi.get_clock_skew_ms(), 500);
230
231 // Negative skew (clock behind)
232 fi.set_clock_skew_signed(-200);
233 assert_eq!(fi.get_clock_skew_ms(), -200);
234
235 // Clear
236 fi.clear_clock_skew();
237 assert_eq!(fi.get_clock_skew_ms(), 0);
238 }
239}