pdc_core/generator/
reqwest_generator.rs1use crate::generator::TrafficGenerator;
2use crate::scenario::Scenario;
3
4use crossbeam_channel::{bounded, Receiver, Sender};
5use reqwest;
6use std::time::{Duration, Instant};
7use std::{
8 collections::BTreeMap,
9 sync::{
10 atomic::{AtomicUsize, Ordering},
11 Arc,
12 },
13};
14use tokio::sync::Semaphore;
15use tokio::{self, runtime::Runtime};
16pub struct ReqwestGenerator<S: Scenario> {
22 pub scenario: S,
23 pub max_open_connections: usize,
27 worker_count: usize,
31 pub warmup: bool,
33 pub req: reqwest::Request,
35 send_rate_channel: (Sender<(f32, f32)>, Receiver<(f32, f32)>),
37 send_num_packets_channel: (Sender<usize>, Receiver<usize>),
39 error_rate_channel: (Sender<(f32, f32)>, Receiver<(f32, f32)>),
41}
42
43impl<T: Scenario> TrafficGenerator<T> for ReqwestGenerator<T> {
44 fn run_scenario(&mut self) {
45 let rt = self.create_runtime();
46 rt.block_on(self.run_reqwests())
47 }
48 fn fire_hose(&mut self) {
49 let mut suc_rates: BTreeMap<usize, f32> = BTreeMap::new();
50 let mut error_rates: BTreeMap<usize, f32> = BTreeMap::new();
51 let mut sent = 0;
52 let core_ids = core_affinity::get_core_ids().unwrap();
53
54 let num_cores = if self.worker_count == 0 {
55 core_ids.len()
56 } else {
57 self.worker_count
58 };
59
60 let num_sent_channel: (Sender<usize>, Receiver<usize>) = bounded(num_cores * 2);
61 let rate_channel: (Sender<(usize, f32)>, Receiver<(usize, f32)>) = bounded(num_cores * 2);
63 let error_rate_channel: (Sender<(usize, f32)>, Receiver<(usize, f32)>) =
64 bounded(num_cores * 2);
65 for (i, thread_id) in core_ids.into_iter().enumerate() {
66 let fut = fire_other_thread(
67 self.max_open_connections / num_cores,
68 self.scenario.duration(),
69 self.req.try_clone().unwrap(),
70 num_sent_channel.0.clone(),
71 rate_channel.0.clone(),
72 error_rate_channel.0.clone(),
73 thread_id.id,
74 );
75 std::thread::spawn(move || {
76 core_affinity::set_for_current(thread_id);
77 let rt = tokio::runtime::Builder::new_current_thread()
78 .enable_all()
79 .build()
80 .unwrap();
81 rt.block_on(fut)
82 });
83 if i >= num_cores {
84 break;
85 }
86 }
87
88 let start = Instant::now();
89 while start.elapsed() < Duration::from_millis(self.scenario.duration() as u64) {
90 while let Ok(rate) = rate_channel.1.try_recv() {
91 suc_rates.insert(rate.0, rate.1);
92 }
93 while let Ok(rate) = error_rate_channel.1.try_recv() {
94 error_rates.insert(rate.0, rate.1);
95 }
96 while let Ok(s) = num_sent_channel.1.try_recv() {
97 sent += s;
98 }
99
100 let _ = self.send_num_packets_channel.0.try_send(sent);
101 let mut tot_suc_rate = 0.;
102 for (_, r) in suc_rates.iter() {
103 tot_suc_rate += r;
104 }
105 let _ = self
106 .send_rate_channel
107 .0
108 .try_send((start.elapsed().as_millis() as f32, tot_suc_rate));
109
110 let mut tot_fail_rate = 0.;
111 for (_, r) in error_rates.iter() {
112 tot_fail_rate += r;
113 }
114 let _ = self
115 .error_rate_channel
116 .0
117 .try_send((start.elapsed().as_millis() as f32, tot_fail_rate));
118
119 std::thread::sleep(Duration::from_millis(50));
121 }
122 }
123
124 fn set_scenario(&mut self, schem: T) {
125 self.scenario = schem;
126 }
127
128 fn get_scenario(&self) -> &T {
129 &self.scenario
130 }
131
132 fn send_packet(&mut self) {
133 let client = reqwest::blocking::Client::new();
134 let _ = client.get(self.req.url().clone()).send();
135 }
136 fn get_data_rate_channel(&self) -> Receiver<(f32, f32)> {
137 self.send_rate_channel.1.clone()
138 }
139
140 fn get_sent_packets_channel(&self) -> Receiver<usize> {
141 self.send_num_packets_channel.1.clone()
142 }
143
144 fn get_error_rate_channel(&self) -> Receiver<(f32, f32)> {
145 self.error_rate_channel.1.clone()
146 }
147}
148
149impl<S: Scenario> ReqwestGenerator<S> {
150 pub fn new(
152 scenario: S,
153 max_open_connections: usize,
154 worker_count: usize,
155 warmup: bool,
156 req: reqwest::Request,
157 ) -> Self {
158 ReqwestGenerator {
159 scenario,
160 max_open_connections,
161 req,
162 warmup,
163 worker_count,
164 send_rate_channel: bounded(10),
165 send_num_packets_channel: bounded(10),
166 error_rate_channel: bounded(10),
167 }
168 }
169
170 fn create_runtime(&self) -> Runtime {
171 if self.worker_count == 0 {
172 tokio::runtime::Runtime::new().unwrap()
173 } else {
174 tokio::runtime::Builder::new_multi_thread()
175 .worker_threads(self.worker_count)
176 .enable_all()
177 .build()
178 .unwrap()
179 }
180 }
181
182 async fn run_reqwests(&mut self) {
183 let max_tokio_spawn: usize = self.max_open_connections;
184 let mut send_rate: f32 = self.scenario.rate(Duration::from_millis(0));
185 let client = reqwest::Client::new();
186 let semaphore = Arc::new(Semaphore::new(max_tokio_spawn as usize));
187 let num_successful = Arc::new(AtomicUsize::new(0));
188 let num_error = Arc::new(AtomicUsize::new(0));
189 let mut send_delay = Duration::from_micros(1000000 / send_rate as u64);
190 let mut previos_count: usize = 0;
191 let mut previos_error: usize = 0;
192 let mut previos_time = Instant::now();
193 let mut start = Instant::now();
194 let mut elasped = start.elapsed();
195 let test_duration = self.scenario.duration();
196 let mut has_warmed_up = false;
197
198 while elasped.as_millis() < test_duration {
199 for _ in 0..10 {
200 let permit = semaphore.clone().acquire_owned().await.unwrap();
201 let ns = num_successful.clone();
202 let ne = num_error.clone();
203 let fut = client.execute(self.req.try_clone().unwrap());
204 tokio::spawn(async move {
205 let res = fut.await;
206 drop(permit);
207 match res {
208 Ok(r) => {
209 if r.status().is_success() {
210 ns.fetch_add(1, Ordering::Relaxed);
211 } else {
212 ne.fetch_add(1, Ordering::Relaxed);
213 }
214 }
215 _ => {
216 ne.fetch_add(1, Ordering::Relaxed);
217 }
218 }
219 });
220 }
221 std::thread::sleep(send_delay);
222
223 let prev_elasped = previos_time.elapsed();
224 if prev_elasped.as_millis() > 100 {
225 elasped = start.elapsed();
226 let suc = num_successful.load(Ordering::Relaxed);
227 let error = num_error.load(Ordering::Relaxed);
228
229 let delta: f64 = suc as f64 - previos_count as f64;
230 let suc_rate = delta as u128 * 1000000 / (prev_elasped.as_micros());
231
232 let delta: f64 = error as f64 - previos_error as f64;
233 let error_rate = delta as u128 * 1000000 / (prev_elasped.as_micros());
234
235 send_rate = self.scenario.rate(elasped);
236 previos_time = Instant::now();
237 previos_count = suc;
238 previos_error = error;
239
240 let rate_delta = suc_rate as f64 - send_rate as f64;
241
242 let adjustment = 2. / (1. + (-(rate_delta as f64) / 10000.).exp());
245
246 send_delay =
247 Duration::from_nanos((send_delay.as_nanos() as f64 * adjustment) as u64);
248
249 if self.warmup && !has_warmed_up {
250 if (adjustment - 1.).abs() < 0.01 {
251 has_warmed_up = true;
252 start = Instant::now();
253 }
254 } else {
255 let _ = self
256 .send_rate_channel
257 .0
258 .try_send((elasped.as_millis() as f32, suc_rate as f32));
259 let _ = self.send_num_packets_channel.0.try_send(suc);
260 let _ = self
261 .error_rate_channel
262 .0
263 .try_send((elasped.as_millis() as f32, error_rate as f32));
264 }
265 }
266 }
267
268 let _ = semaphore.acquire_many(max_tokio_spawn as u32).await;
270 }
271}
272
273async fn fire_other_thread(
276 max_tokio_spawn: usize,
277 test_duration: u128,
278 request: reqwest::Request,
279 num_packets_channel: Sender<usize>,
280 packet_rate_channel: Sender<(usize, f32)>,
281 error_rate_channel: Sender<(usize, f32)>,
282 thread_id: usize,
283) {
284 let client = reqwest::Client::new();
285 let semaphore = Arc::new(Semaphore::new(max_tokio_spawn as usize));
286 let num_successful = Arc::new(AtomicUsize::new(0));
287 let num_error = Arc::new(AtomicUsize::new(0));
288
289 let mut previos_count: usize = 0;
290 let mut previos_error: usize = 0;
291 let mut previos_time = Instant::now();
292 let start = Instant::now();
293
294 while start.elapsed().as_millis() < test_duration {
295 for _ in 0..10 {
296 let permit = semaphore.clone().acquire_owned().await.unwrap();
297 let ns = num_successful.clone();
298 let ne = num_error.clone();
299 let fut = client.execute(request.try_clone().unwrap());
300 tokio::spawn(async move {
301 let res = fut.await;
302 drop(permit);
303 match res {
304 Ok(r) => {
305 if r.status().is_success() {
306 ns.fetch_add(1, Ordering::Relaxed);
307 } else {
308 ne.fetch_add(1, Ordering::Relaxed);
309 }
310 }
311 _ => {
312 ne.fetch_add(1, Ordering::Relaxed);
313 }
314 }
315 });
316 }
317
318 let prev_elasped = previos_time.elapsed();
319 if prev_elasped.as_millis() > 100 {
320 let suc = num_successful.fetch_add(0, Ordering::Relaxed);
321 let error = num_error.load(Ordering::Relaxed);
322
323 let suc_delta: f64 = suc as f64 - previos_count as f64;
324 let rate = suc_delta as u128 * 1000000 / (previos_time.elapsed().as_micros());
325
326 let delta: f64 = error as f64 - previos_error as f64;
327 let error_rate = delta as u128 * 1000000 / (prev_elasped.as_micros());
328
329 previos_time = Instant::now();
330 previos_count = suc;
331 previos_error = error;
332
333 let _ = packet_rate_channel.try_send((thread_id, rate as f32));
334 let _ = num_packets_channel.try_send(suc_delta as usize);
335 let _ = error_rate_channel.try_send((thread_id, error_rate as f32));
336 }
337 }
338
339 let _ = semaphore.acquire_many(max_tokio_spawn as u32).await;
341}