shadow_load_testing/
stress.rs1use shadow_core::{PeerId, PeerInfo};
4use crate::swarm::VirtualSwarm;
5
6pub struct StressTest {
8 iterations: usize,
9 results: Vec<StressResult>,
10}
11
12#[derive(Debug, Clone)]
14pub struct StressResult {
15 pub name: String,
16 pub iterations: usize,
17 pub successes: usize,
18 pub failures: usize,
19 pub elapsed: std::time::Duration,
20}
21
22impl StressResult {
23 pub fn success_rate(&self) -> f64 {
24 if self.iterations == 0 {
25 return 0.0;
26 }
27 self.successes as f64 / self.iterations as f64 * 100.0
28 }
29
30 pub fn ops_per_sec(&self) -> f64 {
31 let secs = self.elapsed.as_secs_f64();
32 if secs == 0.0 {
33 return 0.0;
34 }
35 self.iterations as f64 / secs
36 }
37}
38
39impl StressTest {
40 pub fn new(iterations: usize) -> Self {
41 StressTest {
42 iterations,
43 results: Vec::new(),
44 }
45 }
46
47 pub fn iterations(&self) -> usize {
48 self.iterations
49 }
50
51 pub fn results(&self) -> &[StressResult] {
52 &self.results
53 }
54
55 pub fn add_result(&mut self, result: StressResult) {
56 self.results.push(result);
57 }
58
59 pub fn all_passed(&self) -> bool {
60 self.results.iter().all(|r| r.success_rate() > 99.0)
61 }
62}
63
64pub fn stress_dht_operations(swarm: &VirtualSwarm, ops: usize) -> bool {
66 use bytes::Bytes;
67
68 let mut store = dht::DHTStore::default();
69 let mut successes = 0;
70
71 let entries: Vec<([u8; 32], Bytes)> = (0..ops)
73 .map(|i| {
74 let mut key = [0u8; 32];
75 let i_bytes = (i as u64).to_le_bytes();
76 key[..8].copy_from_slice(&i_bytes);
77 let value = Bytes::from(format!("stress-value-{}", i).into_bytes());
78 (key, value)
79 })
80 .collect();
81
82 for (key, value) in &entries {
83 let publisher = [0u8; 32];
84 let stored = dht::StoredValue::new(value.clone(), publisher, 3600);
85 if store.put(*key, stored).is_ok() {
86 successes += 1;
87 }
88 }
89
90 for (key, expected_value) in &entries {
92 if let Some(retrieved) = store.get(key) {
93 if retrieved.data == *expected_value {
94 successes += 1;
95 }
96 }
97 }
98
99 successes >= ops * 2 * 99 / 100
101}
102
103pub fn stress_message_routing(swarm: &VirtualSwarm, messages: usize) -> bool {
105 use rand::Rng;
106 let mut rng = rand::thread_rng();
107 let peer_count = swarm.peer_count();
108
109 if peer_count < 2 {
110 return false;
111 }
112
113 let mut successes = 0;
114
115 for _ in 0..messages {
116 let sender_idx = rng.gen_range(0..peer_count);
117 let target = PeerId::random();
118
119 let closest = swarm.iterative_lookup(sender_idx, &target, 20);
121 if !closest.is_empty() {
122 successes += 1;
123 }
124 }
125
126 successes >= messages * 80 / 100
128}
129
130pub fn stress_storage_operations(items: usize) -> bool {
132 let store = storage::ContentStore::default();
133 let mut successes = 0;
134
135 for i in 0..items {
136 let data = format!("stress-content-{}-{}", i, "x".repeat(100)).into_bytes();
137 match store.store(&data) {
138 Ok(id) => {
139 if let Ok(retrieved) = store.retrieve(&id) {
140 if retrieved.as_ref() == data.as_slice() {
141 successes += 1;
142 }
143 }
144 }
145 Err(_) => {}
146 }
147 }
148
149 successes >= items * 99 / 100
150}
151
152pub fn stress_concurrent_channels(count: usize) -> bool {
154 let mut successes = 0;
155
156 for _ in 0..count {
157 let a_sk = crypto::SigningKey::generate();
158 let a_id = PeerId::random();
159 let b_sk = crypto::SigningKey::generate();
160 let b_id = PeerId::random();
161
162 let a_vk = a_sk.verify_key();
163 let b_vk = b_sk.verify_key();
164
165 let (a_state, a_pub) = messaging::SecureChannel::initiate(a_sk, a_id);
166 let (b_state, b_pub) = messaging::SecureChannel::initiate(b_sk, b_id);
167
168 if let (Ok(a_chan), Ok(b_chan)) = (
169 messaging::SecureChannel::complete(a_state, &b_pub, b_vk),
170 messaging::SecureChannel::complete(b_state, &a_pub, a_vk),
171 ) {
172 let test_msg = messaging::Message::text(
173 a_id,
174 messaging::MessageTarget::Direct(b_id),
175 "stress test message",
176 );
177 if let Ok(env) = a_chan.encrypt_message(&test_msg) {
178 if let Ok(dec) = b_chan.decrypt_message(&env) {
179 if dec.sender == a_id {
180 successes += 1;
181 }
182 }
183 }
184 }
185 }
186
187 successes >= count * 99 / 100
188}
189
190pub fn stress_stego_operations(cycles: usize) -> bool {
192 let mut successes = 0;
193
194 for i in 0..cycles {
195 let data = format!("stego-stress-{}", i).into_bytes();
196 let config = stego_transport::StegoChannelConfig::default();
197 let channel = stego_transport::StegoChannel::new(config);
198
199 match channel.encode(&data) {
200 Ok(encoded) => {
201 match channel.decode(&encoded) {
202 Ok(decoded) => {
203 if decoded.len() >= data.len() && decoded[..data.len()] == data[..] {
204 successes += 1;
205 }
206 }
207 Err(_) => {}
208 }
209 }
210 Err(_) => {}
211 }
212 }
213
214 successes >= cycles * 95 / 100
215}
216
217pub fn stress_replication_churn(peer_count: usize, churn_rounds: usize) -> bool {
219 use dht::replication::{ReplicationManager, ReplicationConfig};
220 use bytes::Bytes;
221
222 let config = ReplicationConfig::default();
223 let mut manager = ReplicationManager::new(config);
224
225 let peers: Vec<PeerInfo> = (0..peer_count)
226 .map(|i| {
227 PeerInfo::new(
228 PeerId::random(),
229 vec![format!("127.0.0.1:{}", 20000 + i)],
230 [0u8; 32],
231 [0u8; 32],
232 )
233 })
234 .collect();
235
236 let data_key = [0xBBu8; 32];
238 let data_value = Bytes::from(b"stress-repl-value".to_vec());
239 let tasks = manager.schedule_replication(data_key, data_value, &peers[..3.min(peer_count)]);
240
241 for task in &tasks {
243 manager.confirm_replication(data_key, task.target_peer);
244 }
245
246 let initial_count = manager.replica_count(&data_key);
248 initial_count > 0
249}
250
251#[cfg(test)]
252mod tests {
253 use super::*;
254
255 #[test]
256 fn test_stress_dht() {
257 let mut swarm = VirtualSwarm::new(100);
258 swarm.bootstrap(10);
259 assert!(stress_dht_operations(&swarm, 100));
260 }
261
262 #[test]
263 fn test_stress_routing() {
264 let mut swarm = VirtualSwarm::new(100);
265 swarm.bootstrap(10);
266 assert!(stress_message_routing(&swarm, 50));
267 }
268
269 #[test]
270 fn test_stress_storage() {
271 assert!(stress_storage_operations(50));
272 }
273
274 #[test]
275 fn test_stress_channels() {
276 assert!(stress_concurrent_channels(10));
277 }
278
279 #[test]
280 fn test_stress_stego() {
281 assert!(stress_stego_operations(10));
282 }
283
284 #[test]
285 fn test_stress_replication() {
286 assert!(stress_replication_churn(50, 10));
287 }
288
289 #[test]
290 fn test_stress_test_runner() {
291 let mut test = StressTest::new(100);
292 test.add_result(StressResult {
293 name: "test_op".to_string(),
294 iterations: 100,
295 successes: 100,
296 failures: 0,
297 elapsed: std::time::Duration::from_millis(50),
298 });
299 assert!(test.all_passed());
300 assert_eq!(test.results()[0].success_rate(), 100.0);
301 assert!(test.results()[0].ops_per_sec() > 0.0);
302 }
303}