pdc_core/generator/
hyper.rs

1use hyper::Method;
2use hyper::{client::HttpConnector, Body, Client, Request, Uri};
3
4use super::HttpConfig;
5use super::TrafficGenerator;
6use crate::scenario::Scenario;
7
8use crossbeam_channel::{bounded, Receiver, Sender};
9use std::collections::BTreeMap;
10use std::sync::{
11    atomic::{AtomicUsize, Ordering},
12    Arc,
13};
14use std::time::{Duration, Instant};
15use tokio;
16use tokio::sync::Semaphore;
17/// A hyper based generator. This generator use's HTTP multiplexing, and requires that the target server supports
18/// HTTP2. Much like the reqwest generator, `HyperGenerator` is very sensitive to
19/// latency. For good proformance, it is recommended to test servers running on the same LAN as your
20/// machine. This Generator will reuse TCP connections for requests, and all request will be sent from the same IP.
21///
22/// ![Graph of RequestGenerator](https://i.imgur.com/LKeZhWM.gif)
23pub struct HyperGenerator<S: Scenario> {
24    pub scenario: S,
25    pub request: HttpConfig,
26    /// This is the maximum amount of TCP file descriptors **being used** at any one time.
27    /// The actual number of open connections will be higher, because it takes
28    /// time to purge old fd's.
29    pub max_open_connections: usize,
30    /// This controls the amount of worker threads that the backing tokio runtime
31    /// uses. Setting this value to zero will create a default multithreaded runtime
32    /// equivalent to tokio::runtime::Runtime::new()
33    worker_count: usize,
34    /// Sends time series data on the send rate of the generator.
35    send_rate_channel: (Sender<(f32, f32)>, Receiver<(f32, f32)>),
36    /// Sends the total number of successfull requests sent.
37    send_num_packets_channel: (Sender<usize>, Receiver<usize>),
38    /// Sends the error rate.
39    error_rate_channel: (Sender<(f32, f32)>, Receiver<(f32, f32)>),
40    /// True is generator should warm up before sending data
41    warmup: bool,
42}
43
44impl<T: Scenario> TrafficGenerator<T> for HyperGenerator<T> {
45    fn run_scenario(&mut self) {
46        let rt = self.create_runtime();
47        rt.block_on(self.run_reqwests())
48    }
49
50    fn fire_hose(&mut self) {
51        let mut suc_rates: BTreeMap<usize, f32> = BTreeMap::new();
52        let mut error_rates: BTreeMap<usize, f32> = BTreeMap::new();
53        let mut sent = 0;
54        let core_ids = core_affinity::get_core_ids().unwrap();
55
56        let num_cores = if self.worker_count == 0 {
57            core_ids.len()
58        } else {
59            self.worker_count
60        };
61
62        let num_sent_channel: (Sender<usize>, Receiver<usize>) = bounded(num_cores * 2);
63        // ThreadID, rate
64        let rate_channel: (Sender<(usize, f32)>, Receiver<(usize, f32)>) = bounded(num_cores * 2);
65        let error_rate_channel: (Sender<(usize, f32)>, Receiver<(usize, f32)>) =
66            bounded(num_cores * 2);
67        for (i, thread_id) in core_ids.into_iter().enumerate() {
68            let fut = fire_other_thread(
69                self.max_open_connections / num_cores,
70                self.scenario.duration(),
71                num_sent_channel.0.clone(),
72                rate_channel.0.clone(),
73                error_rate_channel.0.clone(),
74                thread_id.id,
75                self.request.clone(),
76            );
77            std::thread::spawn(move || {
78                core_affinity::set_for_current(thread_id);
79                let rt = tokio::runtime::Builder::new_current_thread()
80                    .enable_all()
81                    .build()
82                    .unwrap();
83                rt.block_on(fut)
84            });
85            if i >= num_cores {
86                break;
87            }
88        }
89
90        let start = Instant::now();
91        while start.elapsed() < Duration::from_millis(self.scenario.duration() as u64) {
92            while let Ok(rate) = rate_channel.1.try_recv() {
93                suc_rates.insert(rate.0, rate.1);
94            }
95            while let Ok(rate) = error_rate_channel.1.try_recv() {
96                error_rates.insert(rate.0, rate.1);
97            }
98            while let Ok(s) = num_sent_channel.1.try_recv() {
99                sent += s;
100            }
101
102            let _ = self.send_num_packets_channel.0.try_send(sent);
103            let mut tot_suc_rate = 0.;
104            for (_, r) in suc_rates.iter() {
105                tot_suc_rate += r;
106            }
107            let _ = self
108                .send_rate_channel
109                .0
110                .try_send((start.elapsed().as_millis() as f32, tot_suc_rate));
111
112            let mut tot_fail_rate = 0.;
113            for (_, r) in error_rates.iter() {
114                tot_fail_rate += r;
115            }
116            let _ = self
117                .error_rate_channel
118                .0
119                .try_send((start.elapsed().as_millis() as f32, tot_fail_rate));
120            // This is needed to prevent resource contention over the channels
121            std::thread::sleep(Duration::from_millis(50));
122        }
123    }
124
125    fn set_scenario(&mut self, schem: T) {
126        self.scenario = schem;
127    }
128
129    fn get_scenario(&self) -> &T {
130        &self.scenario
131    }
132
133    fn send_packet(&mut self) {
134        todo!()
135    }
136    fn get_data_rate_channel(&self) -> Receiver<(f32, f32)> {
137        self.send_rate_channel.1.clone()
138    }
139
140    fn get_sent_packets_channel(&self) -> Receiver<usize> {
141        self.send_num_packets_channel.1.clone()
142    }
143
144    fn get_error_rate_channel(&self) -> Receiver<(f32, f32)> {
145        self.error_rate_channel.1.clone()
146    }
147}
148
149impl<S: Scenario> HyperGenerator<S> {
150    /// Constructs a new [`HyperGenerator`]
151    pub fn new(
152        scenario: S,
153        request: HttpConfig,
154        max_open_connections: usize,
155        worker_count: usize,
156        warmup: bool,
157    ) -> Self {
158        HyperGenerator {
159            scenario,
160            request,
161            max_open_connections,
162            worker_count,
163            send_rate_channel: bounded(worker_count * 2),
164            send_num_packets_channel: bounded(worker_count * 2),
165            error_rate_channel: bounded(worker_count * 2),
166            warmup,
167        }
168    }
169
170    fn create_runtime(&self) -> tokio::runtime::Runtime {
171        if self.worker_count == 0 {
172            tokio::runtime::Runtime::new().unwrap()
173        } else {
174            tokio::runtime::Builder::new_multi_thread()
175                .worker_threads(self.worker_count)
176                .enable_all()
177                .build()
178                .unwrap()
179        }
180    }
181
182    //TODO: Give this the same makeover as fire method
183    async fn run_reqwests(&mut self) {
184        let max_tokio_spawn = self.max_open_connections;
185        let mut send_rate: f32 = self.scenario.rate(Duration::from_millis(0));
186        let test_duration = self.scenario.duration();
187        let client: Client<HttpConnector, Body> = Client::builder()
188            .pool_idle_timeout(Duration::from_secs(30))
189            .http2_only(true)
190            .build_http();
191
192        let semaphore = Arc::new(Semaphore::new(max_tokio_spawn as usize));
193        let num_successful = Arc::new(AtomicUsize::new(0));
194        let num_sent = Arc::new(AtomicUsize::new(0));
195        let num_fail = Arc::new(AtomicUsize::new(0));
196
197        let mut previos_suc: usize = 0;
198        let mut previos_fail: usize = 0;
199
200        let mut previos_time = Instant::now();
201        let mut start = Instant::now();
202        let mut send_delay = Duration::from_micros(1000000 / send_rate as u64);
203        let mut has_warmed_up = false;
204
205        let uri = Uri::try_from(self.request.url.clone()).unwrap();
206        let method = Method::try_from(self.request.method.as_str()).unwrap();
207
208        while start.elapsed().as_millis() < test_duration {
209            for _ in 0..100 {
210                let permit = semaphore.clone().acquire_owned().await.unwrap();
211                let suc_clone = num_successful.clone();
212                let tot_clone = num_sent.clone();
213                let fail_clone = num_fail.clone();
214
215                let req = Request::builder()
216                    .uri(uri.clone())
217                    .method(method.clone())
218                    .body(hyper::Body::from(self.request.body.clone()))
219                    .unwrap();
220                let fut = client.request(req);
221                tokio::spawn(async move {
222                    let res = fut.await;
223
224                    tot_clone.fetch_add(1, Ordering::Relaxed);
225                    match res {
226                        Ok(r) => {
227                            if r.status().is_success() {
228                                suc_clone.fetch_add(1, Ordering::Relaxed);
229                            } else {
230                                fail_clone.fetch_add(1, Ordering::Relaxed);
231                            }
232                        }
233                        Err(_e) => {
234                            fail_clone.fetch_add(1, Ordering::Relaxed);
235                        }
236                    }
237                    drop(permit);
238                });
239            }
240            std::thread::sleep(send_delay);
241
242            let prev_elasped = previos_time.elapsed();
243            if prev_elasped.as_millis() > 100 {
244                send_rate = self.scenario.rate(start.elapsed());
245                let suc = num_successful.load(Ordering::Relaxed);
246                let suc_delta: f64 = suc as f64 - previos_suc as f64;
247                let suc_rate = suc_delta as u128 * 1000000 / (prev_elasped.as_micros());
248
249                let adjustment = 2. / (1. + (-(suc_rate as f64 - send_rate as f64) / 10000.).exp());
250
251                send_delay =
252                    Duration::from_nanos((send_delay.as_nanos() as f64 * adjustment) as u64);
253
254                let fail = num_fail.load(Ordering::Relaxed);
255                let fail_delta: f64 = fail as f64 - previos_fail as f64;
256                let fail_rate = fail_delta as u128 * 1000000 / (prev_elasped.as_micros());
257
258                let test_time = start.elapsed().as_millis();
259
260                if self.warmup && !has_warmed_up {
261                    if (adjustment - 1.).abs() < 0.01 {
262                        has_warmed_up = true;
263                        start = Instant::now();
264                    }
265                } else {
266                    let _ = self
267                        .send_rate_channel
268                        .0
269                        .try_send((test_time as f32, suc_rate as f32));
270                    let _ = self
271                        .error_rate_channel
272                        .0
273                        .try_send((test_time as f32, fail_rate as f32));
274                    let _ = self.send_num_packets_channel.0.try_send(suc as usize);
275                }
276
277                previos_suc = suc;
278                previos_fail = fail;
279                previos_time = Instant::now();
280            }
281        }
282
283        // Wait for the last permits to get used
284        let _ = semaphore.acquire_many(max_tokio_spawn as u32).await;
285    }
286}
287
288async fn fire_other_thread(
289    max_tokio_spawn: usize,
290    test_duration: u128,
291    num_packets_channel: Sender<usize>,
292    packet_rate_channel: Sender<(usize, f32)>,
293    error_rate_channel: Sender<(usize, f32)>,
294    thread_id: usize,
295    rconf: HttpConfig,
296) {
297    let client: Client<HttpConnector, Body> = Client::builder()
298        .pool_idle_timeout(Duration::from_secs(30))
299        .http2_only(true)
300        .build_http();
301
302    let semaphore = Arc::new(Semaphore::new(max_tokio_spawn as usize));
303    let num_successful = Arc::new(AtomicUsize::new(0));
304    let num_sent = Arc::new(AtomicUsize::new(0));
305    let num_fail = Arc::new(AtomicUsize::new(0));
306
307    let mut previos_suc: usize = 0;
308    let mut previos_fail: usize = 0;
309    let mut previos_time = Instant::now();
310    let start = Instant::now();
311
312    let uri = Uri::try_from(rconf.url).unwrap();
313    let method = Method::try_from(rconf.method.as_str()).unwrap();
314
315    while start.elapsed().as_millis() < test_duration {
316        for _ in 0..100 {
317            let permit = semaphore.clone().acquire_owned().await.unwrap();
318            let suc_clone = num_successful.clone();
319            let tot_clone = num_sent.clone();
320            let fail_clone = num_fail.clone();
321
322            let req = Request::builder()
323                .uri(uri.clone())
324                .method(method.clone())
325                .body(hyper::Body::from(rconf.body.clone()))
326                .unwrap();
327            let fut = client.request(req);
328            tokio::spawn(async move {
329                let res = fut.await;
330
331                tot_clone.fetch_add(1, Ordering::Relaxed);
332                match res {
333                    Ok(r) => {
334                        if r.status().is_success() {
335                            suc_clone.fetch_add(1, Ordering::Relaxed);
336                        } else {
337                            fail_clone.fetch_add(1, Ordering::Relaxed);
338                        }
339                    }
340                    Err(_e) => {
341                        fail_clone.fetch_add(1, Ordering::Relaxed);
342                    }
343                }
344                drop(permit);
345            });
346        }
347
348        let prev_elasped = previos_time.elapsed();
349        if prev_elasped.as_millis() > 100 {
350            let suc = num_successful.load(Ordering::Relaxed);
351            let suc_delta: f64 = suc as f64 - previos_suc as f64;
352            let suc_rate = suc_delta as u128 * 1000000 / (prev_elasped.as_micros());
353
354            let fail = num_fail.load(Ordering::Relaxed);
355            let fail_delta: f64 = fail as f64 - previos_fail as f64;
356            let fail_rate = fail_delta as u128 * 1000000 / (prev_elasped.as_micros());
357
358            let _ = packet_rate_channel.try_send((thread_id, suc_rate as f32));
359            let _ = error_rate_channel.try_send((thread_id, fail_rate as f32));
360            let _ = num_packets_channel.try_send(suc_delta as usize);
361
362            previos_suc = suc;
363            previos_fail = fail;
364            previos_time = Instant::now();
365        }
366    }
367
368    // Wait for the last permits to get used
369    let _ = semaphore.acquire_many(max_tokio_spawn as u32).await;
370}