pdc_core/generator/
reqwest_generator.rs

1use crate::generator::TrafficGenerator;
2use crate::scenario::Scenario;
3
4use crossbeam_channel::{bounded, Receiver, Sender};
5use reqwest;
6use std::time::{Duration, Instant};
7use std::{
8    collections::BTreeMap,
9    sync::{
10        atomic::{AtomicUsize, Ordering},
11        Arc,
12    },
13};
14use tokio::sync::Semaphore;
15use tokio::{self, runtime::Runtime};
16/// Very simple reqwest based generator. This generator does not use HTTP multiplexing and is very sensitive to
17/// latency. For good proformance, it is recommended to test servers running on the same LAN as your
18/// machine. This Generator will reuse TCP connections for requests, and all request will be sent from the same IP.
19///
20/// ![Graph of RequestGenerator](https://i.imgur.com/LKeZhWM.gif)
21pub struct ReqwestGenerator<S: Scenario> {
22    pub scenario: S,
23    /// This is the maximum amount of TCP file descriptors **being used** at any one time.
24    /// The actual number of open connections will be higher, because it takes
25    /// time to purge old fd's.
26    pub max_open_connections: usize,
27    /// This controls the amount of worker threads that the backing tokio runtime
28    /// uses. Setting this value to zero will create a default multithreaded runtime
29    /// equivalent to tokio::runtime::Runtime::new()
30    worker_count: usize,
31    /// Let this generator warm up to the rate at t=0 before sending test data.
32    pub warmup: bool,
33    /// The request to send.
34    pub req: reqwest::Request,
35    /// Sends time series data on the send rate of the generator.
36    send_rate_channel: (Sender<(f32, f32)>, Receiver<(f32, f32)>),
37    /// Sends the total number of successfull requests sent.
38    send_num_packets_channel: (Sender<usize>, Receiver<usize>),
39    /// Sends the error rate.
40    error_rate_channel: (Sender<(f32, f32)>, Receiver<(f32, f32)>),
41}
42
43impl<T: Scenario> TrafficGenerator<T> for ReqwestGenerator<T> {
44    fn run_scenario(&mut self) {
45        let rt = self.create_runtime();
46        rt.block_on(self.run_reqwests())
47    }
48    fn fire_hose(&mut self) {
49        let mut suc_rates: BTreeMap<usize, f32> = BTreeMap::new();
50        let mut error_rates: BTreeMap<usize, f32> = BTreeMap::new();
51        let mut sent = 0;
52        let core_ids = core_affinity::get_core_ids().unwrap();
53
54        let num_cores = if self.worker_count == 0 {
55            core_ids.len()
56        } else {
57            self.worker_count
58        };
59
60        let num_sent_channel: (Sender<usize>, Receiver<usize>) = bounded(num_cores * 2);
61        // ThreadID, rate
62        let rate_channel: (Sender<(usize, f32)>, Receiver<(usize, f32)>) = bounded(num_cores * 2);
63        let error_rate_channel: (Sender<(usize, f32)>, Receiver<(usize, f32)>) =
64            bounded(num_cores * 2);
65        for (i, thread_id) in core_ids.into_iter().enumerate() {
66            let fut = fire_other_thread(
67                self.max_open_connections / num_cores,
68                self.scenario.duration(),
69                self.req.try_clone().unwrap(),
70                num_sent_channel.0.clone(),
71                rate_channel.0.clone(),
72                error_rate_channel.0.clone(),
73                thread_id.id,
74            );
75            std::thread::spawn(move || {
76                core_affinity::set_for_current(thread_id);
77                let rt = tokio::runtime::Builder::new_current_thread()
78                    .enable_all()
79                    .build()
80                    .unwrap();
81                rt.block_on(fut)
82            });
83            if i >= num_cores {
84                break;
85            }
86        }
87
88        let start = Instant::now();
89        while start.elapsed() < Duration::from_millis(self.scenario.duration() as u64) {
90            while let Ok(rate) = rate_channel.1.try_recv() {
91                suc_rates.insert(rate.0, rate.1);
92            }
93            while let Ok(rate) = error_rate_channel.1.try_recv() {
94                error_rates.insert(rate.0, rate.1);
95            }
96            while let Ok(s) = num_sent_channel.1.try_recv() {
97                sent += s;
98            }
99
100            let _ = self.send_num_packets_channel.0.try_send(sent);
101            let mut tot_suc_rate = 0.;
102            for (_, r) in suc_rates.iter() {
103                tot_suc_rate += r;
104            }
105            let _ = self
106                .send_rate_channel
107                .0
108                .try_send((start.elapsed().as_millis() as f32, tot_suc_rate));
109
110            let mut tot_fail_rate = 0.;
111            for (_, r) in error_rates.iter() {
112                tot_fail_rate += r;
113            }
114            let _ = self
115                .error_rate_channel
116                .0
117                .try_send((start.elapsed().as_millis() as f32, tot_fail_rate));
118
119            // This is needed to prevent resource contention over the channels
120            std::thread::sleep(Duration::from_millis(50));
121        }
122    }
123
124    fn set_scenario(&mut self, schem: T) {
125        self.scenario = schem;
126    }
127
128    fn get_scenario(&self) -> &T {
129        &self.scenario
130    }
131
132    fn send_packet(&mut self) {
133        let client = reqwest::blocking::Client::new();
134        let _ = client.get(self.req.url().clone()).send();
135    }
136    fn get_data_rate_channel(&self) -> Receiver<(f32, f32)> {
137        self.send_rate_channel.1.clone()
138    }
139
140    fn get_sent_packets_channel(&self) -> Receiver<usize> {
141        self.send_num_packets_channel.1.clone()
142    }
143
144    fn get_error_rate_channel(&self) -> Receiver<(f32, f32)> {
145        self.error_rate_channel.1.clone()
146    }
147}
148
149impl<S: Scenario> ReqwestGenerator<S> {
150    /// Constructs a new [`ReqwestGenerator`]
151    pub fn new(
152        scenario: S,
153        max_open_connections: usize,
154        worker_count: usize,
155        warmup: bool,
156        req: reqwest::Request,
157    ) -> Self {
158        ReqwestGenerator {
159            scenario,
160            max_open_connections,
161            req,
162            warmup,
163            worker_count,
164            send_rate_channel: bounded(10),
165            send_num_packets_channel: bounded(10),
166            error_rate_channel: bounded(10),
167        }
168    }
169
170    fn create_runtime(&self) -> Runtime {
171        if self.worker_count == 0 {
172            tokio::runtime::Runtime::new().unwrap()
173        } else {
174            tokio::runtime::Builder::new_multi_thread()
175                .worker_threads(self.worker_count)
176                .enable_all()
177                .build()
178                .unwrap()
179        }
180    }
181
182    async fn run_reqwests(&mut self) {
183        let max_tokio_spawn: usize = self.max_open_connections;
184        let mut send_rate: f32 = self.scenario.rate(Duration::from_millis(0));
185        let client = reqwest::Client::new();
186        let semaphore = Arc::new(Semaphore::new(max_tokio_spawn as usize));
187        let num_successful = Arc::new(AtomicUsize::new(0));
188        let num_error = Arc::new(AtomicUsize::new(0));
189        let mut send_delay = Duration::from_micros(1000000 / send_rate as u64);
190        let mut previos_count: usize = 0;
191        let mut previos_error: usize = 0;
192        let mut previos_time = Instant::now();
193        let mut start = Instant::now();
194        let mut elasped = start.elapsed();
195        let test_duration = self.scenario.duration();
196        let mut has_warmed_up = false;
197
198        while elasped.as_millis() < test_duration {
199            for _ in 0..10 {
200                let permit = semaphore.clone().acquire_owned().await.unwrap();
201                let ns = num_successful.clone();
202                let ne = num_error.clone();
203                let fut = client.execute(self.req.try_clone().unwrap());
204                tokio::spawn(async move {
205                    let res = fut.await;
206                    drop(permit);
207                    match res {
208                        Ok(r) => {
209                            if r.status().is_success() {
210                                ns.fetch_add(1, Ordering::Relaxed);
211                            } else {
212                                ne.fetch_add(1, Ordering::Relaxed);
213                            }
214                        }
215                        _ => {
216                            ne.fetch_add(1, Ordering::Relaxed);
217                        }
218                    }
219                });
220            }
221            std::thread::sleep(send_delay);
222
223            let prev_elasped = previos_time.elapsed();
224            if prev_elasped.as_millis() > 100 {
225                elasped = start.elapsed();
226                let suc = num_successful.load(Ordering::Relaxed);
227                let error = num_error.load(Ordering::Relaxed);
228
229                let delta: f64 = suc as f64 - previos_count as f64;
230                let suc_rate = delta as u128 * 1000000 / (prev_elasped.as_micros());
231
232                let delta: f64 = error as f64 - previos_error as f64;
233                let error_rate = delta as u128 * 1000000 / (prev_elasped.as_micros());
234
235                send_rate = self.scenario.rate(elasped);
236                previos_time = Instant::now();
237                previos_count = suc;
238                previos_error = error;
239
240                let rate_delta = suc_rate as f64 - send_rate as f64;
241
242                // Logisticall Tune rate to target
243                // https://en.wikipedia.org/wiki/Logistic_function
244                let adjustment = 2. / (1. + (-(rate_delta as f64) / 10000.).exp());
245
246                send_delay =
247                    Duration::from_nanos((send_delay.as_nanos() as f64 * adjustment) as u64);
248
249                if self.warmup && !has_warmed_up {
250                    if (adjustment - 1.).abs() < 0.01 {
251                        has_warmed_up = true;
252                        start = Instant::now();
253                    }
254                } else {
255                    let _ = self
256                        .send_rate_channel
257                        .0
258                        .try_send((elasped.as_millis() as f32, suc_rate as f32));
259                    let _ = self.send_num_packets_channel.0.try_send(suc);
260                    let _ = self
261                        .error_rate_channel
262                        .0
263                        .try_send((elasped.as_millis() as f32, error_rate as f32));
264                }
265            }
266        }
267
268        // Wait for the last permits to get used
269        let _ = semaphore.acquire_many(max_tokio_spawn as u32).await;
270    }
271}
272
273/// It is important that this does not take a "self" parameter, it helps
274/// with cache coherency.
275async fn fire_other_thread(
276    max_tokio_spawn: usize,
277    test_duration: u128,
278    request: reqwest::Request,
279    num_packets_channel: Sender<usize>,
280    packet_rate_channel: Sender<(usize, f32)>,
281    error_rate_channel: Sender<(usize, f32)>,
282    thread_id: usize,
283) {
284    let client = reqwest::Client::new();
285    let semaphore = Arc::new(Semaphore::new(max_tokio_spawn as usize));
286    let num_successful = Arc::new(AtomicUsize::new(0));
287    let num_error = Arc::new(AtomicUsize::new(0));
288
289    let mut previos_count: usize = 0;
290    let mut previos_error: usize = 0;
291    let mut previos_time = Instant::now();
292    let start = Instant::now();
293
294    while start.elapsed().as_millis() < test_duration {
295        for _ in 0..10 {
296            let permit = semaphore.clone().acquire_owned().await.unwrap();
297            let ns = num_successful.clone();
298            let ne = num_error.clone();
299            let fut = client.execute(request.try_clone().unwrap());
300            tokio::spawn(async move {
301                let res = fut.await;
302                drop(permit);
303                match res {
304                    Ok(r) => {
305                        if r.status().is_success() {
306                            ns.fetch_add(1, Ordering::Relaxed);
307                        } else {
308                            ne.fetch_add(1, Ordering::Relaxed);
309                        }
310                    }
311                    _ => {
312                        ne.fetch_add(1, Ordering::Relaxed);
313                    }
314                }
315            });
316        }
317
318        let prev_elasped = previos_time.elapsed();
319        if prev_elasped.as_millis() > 100 {
320            let suc = num_successful.fetch_add(0, Ordering::Relaxed);
321            let error = num_error.load(Ordering::Relaxed);
322
323            let suc_delta: f64 = suc as f64 - previos_count as f64;
324            let rate = suc_delta as u128 * 1000000 / (previos_time.elapsed().as_micros());
325
326            let delta: f64 = error as f64 - previos_error as f64;
327            let error_rate = delta as u128 * 1000000 / (prev_elasped.as_micros());
328
329            previos_time = Instant::now();
330            previos_count = suc;
331            previos_error = error;
332
333            let _ = packet_rate_channel.try_send((thread_id, rate as f32));
334            let _ = num_packets_channel.try_send(suc_delta as usize);
335            let _ = error_rate_channel.try_send((thread_id, error_rate as f32));
336        }
337    }
338
339    // Wait for the last permits to get used
340    let _ = semaphore.acquire_many(max_tokio_spawn as u32).await;
341}