1use hyper::Method;
2use hyper::{client::HttpConnector, Body, Client, Request, Uri};
3
4use super::HttpConfig;
5use super::TrafficGenerator;
6use crate::scenario::Scenario;
7
8use crossbeam_channel::{bounded, Receiver, Sender};
9use std::collections::BTreeMap;
10use std::sync::{
11 atomic::{AtomicUsize, Ordering},
12 Arc,
13};
14use std::time::{Duration, Instant};
15use tokio;
16use tokio::sync::Semaphore;
17pub struct HyperGenerator<S: Scenario> {
24 pub scenario: S,
25 pub request: HttpConfig,
26 pub max_open_connections: usize,
30 worker_count: usize,
34 send_rate_channel: (Sender<(f32, f32)>, Receiver<(f32, f32)>),
36 send_num_packets_channel: (Sender<usize>, Receiver<usize>),
38 error_rate_channel: (Sender<(f32, f32)>, Receiver<(f32, f32)>),
40 warmup: bool,
42}
43
44impl<T: Scenario> TrafficGenerator<T> for HyperGenerator<T> {
45 fn run_scenario(&mut self) {
46 let rt = self.create_runtime();
47 rt.block_on(self.run_reqwests())
48 }
49
50 fn fire_hose(&mut self) {
51 let mut suc_rates: BTreeMap<usize, f32> = BTreeMap::new();
52 let mut error_rates: BTreeMap<usize, f32> = BTreeMap::new();
53 let mut sent = 0;
54 let core_ids = core_affinity::get_core_ids().unwrap();
55
56 let num_cores = if self.worker_count == 0 {
57 core_ids.len()
58 } else {
59 self.worker_count
60 };
61
62 let num_sent_channel: (Sender<usize>, Receiver<usize>) = bounded(num_cores * 2);
63 let rate_channel: (Sender<(usize, f32)>, Receiver<(usize, f32)>) = bounded(num_cores * 2);
65 let error_rate_channel: (Sender<(usize, f32)>, Receiver<(usize, f32)>) =
66 bounded(num_cores * 2);
67 for (i, thread_id) in core_ids.into_iter().enumerate() {
68 let fut = fire_other_thread(
69 self.max_open_connections / num_cores,
70 self.scenario.duration(),
71 num_sent_channel.0.clone(),
72 rate_channel.0.clone(),
73 error_rate_channel.0.clone(),
74 thread_id.id,
75 self.request.clone(),
76 );
77 std::thread::spawn(move || {
78 core_affinity::set_for_current(thread_id);
79 let rt = tokio::runtime::Builder::new_current_thread()
80 .enable_all()
81 .build()
82 .unwrap();
83 rt.block_on(fut)
84 });
85 if i >= num_cores {
86 break;
87 }
88 }
89
90 let start = Instant::now();
91 while start.elapsed() < Duration::from_millis(self.scenario.duration() as u64) {
92 while let Ok(rate) = rate_channel.1.try_recv() {
93 suc_rates.insert(rate.0, rate.1);
94 }
95 while let Ok(rate) = error_rate_channel.1.try_recv() {
96 error_rates.insert(rate.0, rate.1);
97 }
98 while let Ok(s) = num_sent_channel.1.try_recv() {
99 sent += s;
100 }
101
102 let _ = self.send_num_packets_channel.0.try_send(sent);
103 let mut tot_suc_rate = 0.;
104 for (_, r) in suc_rates.iter() {
105 tot_suc_rate += r;
106 }
107 let _ = self
108 .send_rate_channel
109 .0
110 .try_send((start.elapsed().as_millis() as f32, tot_suc_rate));
111
112 let mut tot_fail_rate = 0.;
113 for (_, r) in error_rates.iter() {
114 tot_fail_rate += r;
115 }
116 let _ = self
117 .error_rate_channel
118 .0
119 .try_send((start.elapsed().as_millis() as f32, tot_fail_rate));
120 std::thread::sleep(Duration::from_millis(50));
122 }
123 }
124
125 fn set_scenario(&mut self, schem: T) {
126 self.scenario = schem;
127 }
128
129 fn get_scenario(&self) -> &T {
130 &self.scenario
131 }
132
133 fn send_packet(&mut self) {
134 todo!()
135 }
136 fn get_data_rate_channel(&self) -> Receiver<(f32, f32)> {
137 self.send_rate_channel.1.clone()
138 }
139
140 fn get_sent_packets_channel(&self) -> Receiver<usize> {
141 self.send_num_packets_channel.1.clone()
142 }
143
144 fn get_error_rate_channel(&self) -> Receiver<(f32, f32)> {
145 self.error_rate_channel.1.clone()
146 }
147}
148
149impl<S: Scenario> HyperGenerator<S> {
150 pub fn new(
152 scenario: S,
153 request: HttpConfig,
154 max_open_connections: usize,
155 worker_count: usize,
156 warmup: bool,
157 ) -> Self {
158 HyperGenerator {
159 scenario,
160 request,
161 max_open_connections,
162 worker_count,
163 send_rate_channel: bounded(worker_count * 2),
164 send_num_packets_channel: bounded(worker_count * 2),
165 error_rate_channel: bounded(worker_count * 2),
166 warmup,
167 }
168 }
169
170 fn create_runtime(&self) -> tokio::runtime::Runtime {
171 if self.worker_count == 0 {
172 tokio::runtime::Runtime::new().unwrap()
173 } else {
174 tokio::runtime::Builder::new_multi_thread()
175 .worker_threads(self.worker_count)
176 .enable_all()
177 .build()
178 .unwrap()
179 }
180 }
181
182 async fn run_reqwests(&mut self) {
184 let max_tokio_spawn = self.max_open_connections;
185 let mut send_rate: f32 = self.scenario.rate(Duration::from_millis(0));
186 let test_duration = self.scenario.duration();
187 let client: Client<HttpConnector, Body> = Client::builder()
188 .pool_idle_timeout(Duration::from_secs(30))
189 .http2_only(true)
190 .build_http();
191
192 let semaphore = Arc::new(Semaphore::new(max_tokio_spawn as usize));
193 let num_successful = Arc::new(AtomicUsize::new(0));
194 let num_sent = Arc::new(AtomicUsize::new(0));
195 let num_fail = Arc::new(AtomicUsize::new(0));
196
197 let mut previos_suc: usize = 0;
198 let mut previos_fail: usize = 0;
199
200 let mut previos_time = Instant::now();
201 let mut start = Instant::now();
202 let mut send_delay = Duration::from_micros(1000000 / send_rate as u64);
203 let mut has_warmed_up = false;
204
205 let uri = Uri::try_from(self.request.url.clone()).unwrap();
206 let method = Method::try_from(self.request.method.as_str()).unwrap();
207
208 while start.elapsed().as_millis() < test_duration {
209 for _ in 0..100 {
210 let permit = semaphore.clone().acquire_owned().await.unwrap();
211 let suc_clone = num_successful.clone();
212 let tot_clone = num_sent.clone();
213 let fail_clone = num_fail.clone();
214
215 let req = Request::builder()
216 .uri(uri.clone())
217 .method(method.clone())
218 .body(hyper::Body::from(self.request.body.clone()))
219 .unwrap();
220 let fut = client.request(req);
221 tokio::spawn(async move {
222 let res = fut.await;
223
224 tot_clone.fetch_add(1, Ordering::Relaxed);
225 match res {
226 Ok(r) => {
227 if r.status().is_success() {
228 suc_clone.fetch_add(1, Ordering::Relaxed);
229 } else {
230 fail_clone.fetch_add(1, Ordering::Relaxed);
231 }
232 }
233 Err(_e) => {
234 fail_clone.fetch_add(1, Ordering::Relaxed);
235 }
236 }
237 drop(permit);
238 });
239 }
240 std::thread::sleep(send_delay);
241
242 let prev_elasped = previos_time.elapsed();
243 if prev_elasped.as_millis() > 100 {
244 send_rate = self.scenario.rate(start.elapsed());
245 let suc = num_successful.load(Ordering::Relaxed);
246 let suc_delta: f64 = suc as f64 - previos_suc as f64;
247 let suc_rate = suc_delta as u128 * 1000000 / (prev_elasped.as_micros());
248
249 let adjustment = 2. / (1. + (-(suc_rate as f64 - send_rate as f64) / 10000.).exp());
250
251 send_delay =
252 Duration::from_nanos((send_delay.as_nanos() as f64 * adjustment) as u64);
253
254 let fail = num_fail.load(Ordering::Relaxed);
255 let fail_delta: f64 = fail as f64 - previos_fail as f64;
256 let fail_rate = fail_delta as u128 * 1000000 / (prev_elasped.as_micros());
257
258 let test_time = start.elapsed().as_millis();
259
260 if self.warmup && !has_warmed_up {
261 if (adjustment - 1.).abs() < 0.01 {
262 has_warmed_up = true;
263 start = Instant::now();
264 }
265 } else {
266 let _ = self
267 .send_rate_channel
268 .0
269 .try_send((test_time as f32, suc_rate as f32));
270 let _ = self
271 .error_rate_channel
272 .0
273 .try_send((test_time as f32, fail_rate as f32));
274 let _ = self.send_num_packets_channel.0.try_send(suc as usize);
275 }
276
277 previos_suc = suc;
278 previos_fail = fail;
279 previos_time = Instant::now();
280 }
281 }
282
283 let _ = semaphore.acquire_many(max_tokio_spawn as u32).await;
285 }
286}
287
288async fn fire_other_thread(
289 max_tokio_spawn: usize,
290 test_duration: u128,
291 num_packets_channel: Sender<usize>,
292 packet_rate_channel: Sender<(usize, f32)>,
293 error_rate_channel: Sender<(usize, f32)>,
294 thread_id: usize,
295 rconf: HttpConfig,
296) {
297 let client: Client<HttpConnector, Body> = Client::builder()
298 .pool_idle_timeout(Duration::from_secs(30))
299 .http2_only(true)
300 .build_http();
301
302 let semaphore = Arc::new(Semaphore::new(max_tokio_spawn as usize));
303 let num_successful = Arc::new(AtomicUsize::new(0));
304 let num_sent = Arc::new(AtomicUsize::new(0));
305 let num_fail = Arc::new(AtomicUsize::new(0));
306
307 let mut previos_suc: usize = 0;
308 let mut previos_fail: usize = 0;
309 let mut previos_time = Instant::now();
310 let start = Instant::now();
311
312 let uri = Uri::try_from(rconf.url).unwrap();
313 let method = Method::try_from(rconf.method.as_str()).unwrap();
314
315 while start.elapsed().as_millis() < test_duration {
316 for _ in 0..100 {
317 let permit = semaphore.clone().acquire_owned().await.unwrap();
318 let suc_clone = num_successful.clone();
319 let tot_clone = num_sent.clone();
320 let fail_clone = num_fail.clone();
321
322 let req = Request::builder()
323 .uri(uri.clone())
324 .method(method.clone())
325 .body(hyper::Body::from(rconf.body.clone()))
326 .unwrap();
327 let fut = client.request(req);
328 tokio::spawn(async move {
329 let res = fut.await;
330
331 tot_clone.fetch_add(1, Ordering::Relaxed);
332 match res {
333 Ok(r) => {
334 if r.status().is_success() {
335 suc_clone.fetch_add(1, Ordering::Relaxed);
336 } else {
337 fail_clone.fetch_add(1, Ordering::Relaxed);
338 }
339 }
340 Err(_e) => {
341 fail_clone.fetch_add(1, Ordering::Relaxed);
342 }
343 }
344 drop(permit);
345 });
346 }
347
348 let prev_elasped = previos_time.elapsed();
349 if prev_elasped.as_millis() > 100 {
350 let suc = num_successful.load(Ordering::Relaxed);
351 let suc_delta: f64 = suc as f64 - previos_suc as f64;
352 let suc_rate = suc_delta as u128 * 1000000 / (prev_elasped.as_micros());
353
354 let fail = num_fail.load(Ordering::Relaxed);
355 let fail_delta: f64 = fail as f64 - previos_fail as f64;
356 let fail_rate = fail_delta as u128 * 1000000 / (prev_elasped.as_micros());
357
358 let _ = packet_rate_channel.try_send((thread_id, suc_rate as f32));
359 let _ = error_rate_channel.try_send((thread_id, fail_rate as f32));
360 let _ = num_packets_channel.try_send(suc_delta as usize);
361
362 previos_suc = suc;
363 previos_fail = fail;
364 previos_time = Instant::now();
365 }
366 }
367
368 let _ = semaphore.acquire_many(max_tokio_spawn as u32).await;
370}