rusmes_loadtest/
scenarios.rs1use anyhow::Result;
4use serde::{Deserialize, Serialize};
5use std::future::Future;
6use std::pin::Pin;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use tokio::sync::RwLock;
10use tokio::time::sleep;
11
12use crate::config::LoadTestConfig;
13use crate::generators::MessageGenerator;
14use crate::metrics::LoadTestMetrics;
15use crate::protocols::{ImapClient, SmtpClient};
16
17#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
19pub enum ScenarioType {
20 SmtpThroughput,
22 ConcurrentConnections,
24 MixedProtocol,
26 SustainedLoad,
28}
29
30impl ScenarioType {
31 pub fn create_runner(
33 &self,
34 config: &LoadTestConfig,
35 ) -> Box<dyn LoadTestScenario + Send + Sync> {
36 match self {
37 ScenarioType::SmtpThroughput => Box::new(SmtpThroughputScenario::new(config.clone())),
38 ScenarioType::ConcurrentConnections => {
39 Box::new(ConcurrentConnectionsScenario::new(config.clone()))
40 }
41 ScenarioType::MixedProtocol => Box::new(MixedProtocolScenario::new(config.clone())),
42 ScenarioType::SustainedLoad => Box::new(SustainedLoadScenario::new(config.clone())),
43 }
44 }
45}
46
47pub trait LoadTestScenario {
49 fn run(
51 &self,
52 metrics: Arc<RwLock<LoadTestMetrics>>,
53 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>>;
54}
55
56pub struct SmtpThroughputScenario {
58 config: LoadTestConfig,
59}
60
61impl SmtpThroughputScenario {
62 pub fn new(config: LoadTestConfig) -> Self {
63 Self { config }
64 }
65}
66
67impl LoadTestScenario for SmtpThroughputScenario {
68 fn run(
69 &self,
70 metrics: Arc<RwLock<LoadTestMetrics>>,
71 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
72 Box::pin(async move {
73 metrics.write().await.mark_started();
74
75 let (min_size, max_size) = self.config.message_size_range();
76 let generator = MessageGenerator::with_content_type(
77 min_size,
78 max_size,
79 self.config.message_content,
80 );
81
82 let duration = Duration::from_secs(self.config.duration_secs);
83 let start = Instant::now();
84
85 let mut tasks = vec![];
86
87 for _ in 0..self.config.concurrency {
88 let metrics = metrics.clone();
89 let config = self.config.clone();
90 let generator = generator.clone();
91
92 let task = tokio::spawn(async move {
93 while start.elapsed() < duration {
94 let message = generator.generate();
95 let request_start = Instant::now();
96
97 match SmtpClient::send_message(
98 &config.target_host,
99 config.target_port,
100 &message,
101 )
102 .await
103 {
104 Ok(bytes_received) => {
105 let latency = request_start.elapsed();
106 metrics.write().await.record_success(
107 latency,
108 message.len(),
109 bytes_received,
110 );
111 }
112 Err(e) => {
113 metrics.write().await.record_failure(e.to_string());
114 }
115 }
116
117 let delay = Duration::from_millis(1000 / config.message_rate);
119 sleep(delay).await;
120 }
121 });
122
123 tasks.push(task);
124 }
125
126 for task in tasks {
127 let _ = task.await;
128 }
129
130 metrics.write().await.mark_completed();
131 Ok(())
132 })
133 }
134}
135
136pub struct ConcurrentConnectionsScenario {
138 config: LoadTestConfig,
139}
140
141impl ConcurrentConnectionsScenario {
142 pub fn new(config: LoadTestConfig) -> Self {
143 Self { config }
144 }
145}
146
147impl LoadTestScenario for ConcurrentConnectionsScenario {
148 fn run(
149 &self,
150 metrics: Arc<RwLock<LoadTestMetrics>>,
151 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
152 Box::pin(async move {
153 metrics.write().await.mark_started();
154
155 let (min_size, max_size) = self.config.message_size_range();
156 let generator = MessageGenerator::with_content_type(
157 min_size,
158 max_size,
159 self.config.message_content,
160 );
161
162 let mut tasks = vec![];
163
164 for _ in 0..self.config.concurrency {
166 let metrics = metrics.clone();
167 let config = self.config.clone();
168 let generator = generator.clone();
169
170 let task = tokio::spawn(async move {
171 let message = generator.generate();
172 let request_start = Instant::now();
173
174 match SmtpClient::send_message(
175 &config.target_host,
176 config.target_port,
177 &message,
178 )
179 .await
180 {
181 Ok(bytes_received) => {
182 let latency = request_start.elapsed();
183 metrics.write().await.record_success(
184 latency,
185 message.len(),
186 bytes_received,
187 );
188 }
189 Err(e) => {
190 metrics.write().await.record_failure(e.to_string());
191 }
192 }
193 });
194
195 tasks.push(task);
196 }
197
198 for task in tasks {
199 let _ = task.await;
200 }
201
202 metrics.write().await.mark_completed();
203 Ok(())
204 })
205 }
206}
207
208pub struct MixedProtocolScenario {
210 config: LoadTestConfig,
211}
212
213impl MixedProtocolScenario {
214 pub fn new(config: LoadTestConfig) -> Self {
215 Self { config }
216 }
217}
218
219impl LoadTestScenario for MixedProtocolScenario {
220 fn run(
221 &self,
222 metrics: Arc<RwLock<LoadTestMetrics>>,
223 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
224 Box::pin(async move {
225 metrics.write().await.mark_started();
226
227 let (min_size, max_size) = self.config.message_size_range();
228 let generator = MessageGenerator::with_content_type(
229 min_size,
230 max_size,
231 self.config.message_content,
232 );
233
234 let duration = Duration::from_secs(self.config.duration_secs);
235 let start = Instant::now();
236
237 let mut tasks = vec![];
238
239 for i in 0..self.config.concurrency {
240 let metrics = metrics.clone();
241 let config = self.config.clone();
242 let generator = generator.clone();
243
244 let task = tokio::spawn(async move {
245 while start.elapsed() < duration {
246 if i % 2 == 0 {
247 let message = generator.generate();
249 let request_start = Instant::now();
250
251 match SmtpClient::send_message(
252 &config.target_host,
253 config.target_port,
254 &message,
255 )
256 .await
257 {
258 Ok(bytes_received) => {
259 let latency = request_start.elapsed();
260 metrics.write().await.record_success(
261 latency,
262 message.len(),
263 bytes_received,
264 );
265 }
266 Err(e) => {
267 metrics.write().await.record_failure(e.to_string());
268 }
269 }
270 } else {
271 let request_start = Instant::now();
273
274 match ImapClient::fetch_messages(
275 &config.target_host,
276 config.target_port + 100,
277 )
278 .await
279 {
280 Ok(bytes_received) => {
281 let latency = request_start.elapsed();
282 metrics.write().await.record_success(
283 latency,
284 0,
285 bytes_received,
286 );
287 }
288 Err(e) => {
289 metrics.write().await.record_failure(e.to_string());
290 }
291 }
292 }
293
294 sleep(Duration::from_millis(100)).await;
295 }
296 });
297
298 tasks.push(task);
299 }
300
301 for task in tasks {
302 let _ = task.await;
303 }
304
305 metrics.write().await.mark_completed();
306 Ok(())
307 })
308 }
309}
310
311pub struct SustainedLoadScenario {
313 config: LoadTestConfig,
314}
315
316impl SustainedLoadScenario {
317 pub fn new(config: LoadTestConfig) -> Self {
318 Self { config }
319 }
320}
321
322impl LoadTestScenario for SustainedLoadScenario {
323 fn run(
324 &self,
325 metrics: Arc<RwLock<LoadTestMetrics>>,
326 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
327 Box::pin(async move {
328 metrics.write().await.mark_started();
329
330 let (min_size, max_size) = self.config.message_size_range();
331 let generator = MessageGenerator::with_content_type(
332 min_size,
333 max_size,
334 self.config.message_content,
335 );
336
337 let duration = Duration::from_secs(self.config.duration_secs);
338 let start = Instant::now();
339
340 let mut tasks = vec![];
341
342 for _ in 0..self.config.concurrency {
343 let metrics = metrics.clone();
344 let config = self.config.clone();
345 let generator = generator.clone();
346
347 let task = tokio::spawn(async move {
348 while start.elapsed() < duration {
349 let message = generator.generate();
350 let request_start = Instant::now();
351
352 match SmtpClient::send_message(
353 &config.target_host,
354 config.target_port,
355 &message,
356 )
357 .await
358 {
359 Ok(bytes_received) => {
360 let latency = request_start.elapsed();
361 metrics.write().await.record_success(
362 latency,
363 message.len(),
364 bytes_received,
365 );
366 }
367 Err(e) => {
368 metrics.write().await.record_failure(e.to_string());
369 }
370 }
371
372 let delay = Duration::from_millis(1000);
374 sleep(delay).await;
375 }
376 });
377
378 tasks.push(task);
379 }
380
381 for task in tasks {
382 let _ = task.await;
383 }
384
385 metrics.write().await.mark_completed();
386 Ok(())
387 })
388 }
389}
390
391pub type ScenarioRunner = Box<dyn LoadTestScenario + Send + Sync>;
392
393#[cfg(test)]
394mod tests {
395 use super::*;
396
397 #[test]
398 fn test_scenario_creation() {
399 let config = LoadTestConfig::default();
400 let _runner = ScenarioType::SmtpThroughput.create_runner(&config);
401 let _runner = ScenarioType::ConcurrentConnections.create_runner(&config);
402 let _runner = ScenarioType::MixedProtocol.create_runner(&config);
403 let _runner = ScenarioType::SustainedLoad.create_runner(&config);
404 }
405}